Skip to content

Commit 5ce1ab9

Browse files
committed
added resource monitoring with configurable CPU and memory thresholds
1 parent 4fb2da7 commit 5ce1ab9

File tree

5 files changed

+132
-37
lines changed

5 files changed

+132
-37
lines changed

src/cli.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,25 @@ pub struct Options {
317317
)]
318318
pub parquet_compression: Compression,
319319

320+
// Resource monitoring
321+
#[arg(
322+
long,
323+
env = "P_CPU_THRESHOLD",
324+
default_value = "80.0",
325+
value_parser = validation::validate_percentage,
326+
help = "CPU utilization threshold percentage (0.0-100.0) for resource monitoring"
327+
)]
328+
pub cpu_utilization_threshold: f32,
329+
330+
#[arg(
331+
long,
332+
env = "P_MEMORY_THRESHOLD",
333+
default_value = "80.0",
334+
value_parser = validation::validate_percentage,
335+
help = "Memory utilization threshold percentage (0.0-100.0) for resource monitoring"
336+
)]
337+
pub memory_utilization_threshold: f32,
338+
320339
// Integration features
321340
#[arg(
322341
long,

src/handlers/http/modal/ingest_server.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::{
3939
http::{
4040
base_path, ingest, logstream,
4141
middleware::{DisAllowRootUser, RouteExt},
42-
role,
42+
resource_check, role,
4343
},
4444
},
4545
migration,
@@ -126,12 +126,18 @@ impl ParseableServer for IngestServer {
126126
let (cancel_tx, cancel_rx) = oneshot::channel();
127127
thread::spawn(|| sync::handler(cancel_rx));
128128

129+
// Start resource monitor
130+
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
131+
resource_check::spawn_resource_monitor(resource_shutdown_rx);
132+
129133
tokio::spawn(airplane::server());
130134

131135
// Ingestors shouldn't have to deal with OpenId auth flow
132136
let result = self.start(shutdown_rx, prometheus.clone(), None).await;
133137
// Cancel sync jobs
134138
cancel_tx.send(()).expect("Cancellation should not fail");
139+
// Shutdown resource monitor
140+
let _ = resource_shutdown_tx.send(());
135141
if let Err(join_err) = startup_sync_handle.await {
136142
tracing::warn!("startup sync task panicked: {join_err}");
137143
}

src/handlers/http/modal/server.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
2525
use crate::handlers::http::base_path;
2626
use crate::handlers::http::health_check;
2727
use crate::handlers::http::prism_base_path;
28+
use crate::handlers::http::resource_check;
2829
use crate::handlers::http::query;
2930
use crate::handlers::http::users::dashboards;
3031
use crate::handlers::http::users::filters;
@@ -138,6 +139,10 @@ impl ParseableServer for Server {
138139
let (cancel_tx, cancel_rx) = oneshot::channel();
139140
thread::spawn(|| sync::handler(cancel_rx));
140141

142+
// Start resource monitor
143+
let (resource_shutdown_tx, resource_shutdown_rx) = oneshot::channel();
144+
resource_check::spawn_resource_monitor(resource_shutdown_rx);
145+
141146
if PARSEABLE.options.send_analytics {
142147
analytics::init_analytics_scheduler()?;
143148
}
@@ -150,6 +155,8 @@ impl ParseableServer for Server {
150155
.await;
151156
// Cancel sync jobs
152157
cancel_tx.send(()).expect("Cancellation should not fail");
158+
// Shutdown resource monitor
159+
let _ = resource_shutdown_tx.send(());
153160
if let Err(join_err) = startup_sync_handle.await {
154161
tracing::warn!("startup sync task panicked: {join_err}");
155162
}

src/handlers/http/resource_check.rs

Lines changed: 87 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,51 +23,102 @@ use actix_web::{
2323
error::ErrorServiceUnavailable,
2424
middleware::Next,
2525
};
26-
use sysinfo::System;
27-
use tracing::warn;
26+
use tokio::{select, time::{interval, Duration}};
27+
use tokio::sync::RwLock;
28+
use tracing::{warn, trace, info};
2829

29-
const CPU_UTILIZATION_THRESHOLD: f32 = 90.0;
30-
const MEMORY_UTILIZATION_THRESHOLD: f32 = 90.0;
30+
use crate::analytics::{SYS_INFO, refresh_sys_info};
31+
use crate::parseable::PARSEABLE;
32+
33+
static RESOURCE_CHECK_ENABLED: RwLock<bool> = RwLock::const_new(true);
34+
35+
/// Spawn a background task to monitor system resources
36+
pub fn spawn_resource_monitor(shutdown_rx: tokio::sync::oneshot::Receiver<()>) {
37+
tokio::spawn(async move {
38+
let mut check_interval = interval(Duration::from_secs(30));
39+
let mut shutdown_rx = shutdown_rx;
40+
41+
let cpu_threshold = PARSEABLE.options.cpu_utilization_threshold;
42+
let memory_threshold = PARSEABLE.options.memory_utilization_threshold;
43+
44+
info!("Resource monitor started with thresholds - CPU: {:.1}%, Memory: {:.1}%",
45+
cpu_threshold, memory_threshold);
46+
loop {
47+
select! {
48+
_ = check_interval.tick() => {
49+
trace!("Checking system resource utilization...");
50+
51+
refresh_sys_info();
52+
let (used_memory, total_memory, cpu_usage) = {
53+
let sys = SYS_INFO.lock().unwrap();
54+
let used_memory = sys.used_memory() as f32;
55+
let total_memory = sys.total_memory() as f32;
56+
let cpu_usage = sys.global_cpu_usage();
57+
(used_memory, total_memory, cpu_usage)
58+
};
59+
60+
let mut resource_ok = true;
61+
62+
// Calculate memory usage percentage
63+
let memory_usage = if total_memory > 0.0 {
64+
(used_memory / total_memory) * 100.0
65+
} else {
66+
0.0
67+
};
68+
69+
// Log current resource usage every few checks for debugging
70+
info!("Current resource usage - CPU: {:.1}%, Memory: {:.1}% ({:.1}GB/{:.1}GB)",
71+
cpu_usage, memory_usage,
72+
used_memory / 1024.0 / 1024.0 / 1024.0,
73+
total_memory / 1024.0 / 1024.0 / 1024.0);
74+
75+
// Check memory utilization
76+
if memory_usage > memory_threshold {
77+
warn!("High memory usage detected: {:.1}% (threshold: {:.1}%)",
78+
memory_usage, memory_threshold);
79+
resource_ok = false;
80+
}
81+
82+
// Check CPU utilization
83+
if cpu_usage > cpu_threshold {
84+
warn!("High CPU usage detected: {:.1}% (threshold: {:.1}%)",
85+
cpu_usage, cpu_threshold);
86+
resource_ok = false;
87+
}
88+
89+
let previous_state = *RESOURCE_CHECK_ENABLED.read().await;
90+
*RESOURCE_CHECK_ENABLED.write().await = resource_ok;
91+
92+
// Log state changes
93+
if previous_state != resource_ok {
94+
if resource_ok {
95+
info!("Resource utilization back to normal - requests will be accepted");
96+
} else {
97+
warn!("Resource utilization too high - requests will be rejected");
98+
}
99+
}
100+
},
101+
_ = &mut shutdown_rx => {
102+
trace!("Resource monitor shutting down");
103+
break;
104+
}
105+
}
106+
}
107+
});
108+
}
31109

32110
/// Middleware to check system resource utilization before processing requests
33-
/// Returns 503 Service Unavailable if CPU or memory usage exceeds thresholds
111+
/// Returns 503 Service Unavailable if resources are over-utilized
34112
pub async fn check_resource_utilization_middleware(
35113
req: ServiceRequest,
36114
next: Next<impl MessageBody>,
37115
) -> Result<ServiceResponse<impl MessageBody>, Error> {
38116

39-
let mut sys = System::new_all();
40-
sys.refresh_cpu_usage();
41-
sys.refresh_memory();
117+
let resource_ok = *RESOURCE_CHECK_ENABLED.read().await;
42118

43-
let used_memory = sys.used_memory() as f32;
44-
let total_memory = sys.total_memory() as f32;
45-
46-
// Check memory utilization
47-
if total_memory > 0.0 {
48-
let memory_usage = (used_memory / total_memory) * 100.0;
49-
if memory_usage > MEMORY_UTILIZATION_THRESHOLD {
50-
let error_msg = format!("Memory is over-utilized: {:.1}%", memory_usage);
51-
warn!(
52-
"Rejecting request to {} due to high memory usage: {:.1}% (threshold: {:.1}%)",
53-
req.path(),
54-
memory_usage,
55-
MEMORY_UTILIZATION_THRESHOLD
56-
);
57-
return Err(ErrorServiceUnavailable(error_msg));
58-
}
59-
}
60-
61-
// Check CPU utilization
62-
let cpu_usage = sys.global_cpu_usage();
63-
if cpu_usage > CPU_UTILIZATION_THRESHOLD {
64-
let error_msg = format!("CPU is over-utilized: {:.1}%", cpu_usage);
65-
warn!(
66-
"Rejecting request to {} due to high CPU usage: {:.1}% (threshold: {:.1}%)",
67-
req.path(),
68-
cpu_usage,
69-
CPU_UTILIZATION_THRESHOLD
70-
);
119+
if !resource_ok {
120+
let error_msg = "Server resources over-utilized";
121+
warn!("Rejecting request to {} due to resource constraints", req.path());
71122
return Err(ErrorServiceUnavailable(error_msg));
72123
}
73124

src/option.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ pub mod validation {
175175
}
176176
}
177177

178+
pub fn validate_percentage(percentage: &str) -> Result<f32, String> {
179+
if let Ok(percentage) = percentage.parse::<f32>() {
180+
if (0.0..=100.0).contains(&percentage) {
181+
Ok(percentage)
182+
} else {
183+
Err("Invalid percentage value. It should be between 0.0 and 100.0".to_string())
184+
}
185+
} else {
186+
Err("Invalid percentage value. It should be a decimal number like 80.0".to_string())
187+
}
188+
}
189+
178190
pub fn validate_dataset_fields_allowed_limit(s: &str) -> Result<usize, String> {
179191
if let Ok(size) = s.parse::<usize>() {
180192
if (1..=DATASET_FIELD_COUNT_LIMIT).contains(&size) {

0 commit comments

Comments
 (0)