Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion bottlecap/LICENSE-3rdparty.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,19 @@ third_party_libraries:
license: 0BSD OR MIT OR Apache-2.0
licenses:
- license: 0BSD
text: NOT FOUND
text: |
Copyright (C) Jonas Schievink <[email protected]>

Permission to use, copy, modify, and/or distribute this software for
any purpose with or without fee is hereby granted.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN
AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- license: MIT
text: |
Permission is hereby granted, free of charge, to any
Expand Down
3 changes: 3 additions & 0 deletions bottlecap/src/metrics/enhanced/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ pub const THREADS_MAX_METRIC: &str = "aws.lambda.enhanced.threads_max";
pub const THREADS_USE_METRIC: &str = "aws.lambda.enhanced.threads_use";
//pub const ASM_INVOCATIONS_METRIC: &str = "aws.lambda.enhanced.asm.invocations";
pub const ENHANCED_METRICS_ENV_VAR: &str = "DD_ENHANCED_METRICS";

// Monitoring interval for tmp, fd, and threads metrics
pub const MONITORING_INTERVAL: u64 = 10;
125 changes: 38 additions & 87 deletions bottlecap/src/metrics/enhanced/lambda.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,7 @@ impl Lambda {
}
}

#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(unused_mut)]
pub fn set_tmp_enhanced_metrics(&self, mut send_metrics: Receiver<()>) {
// Temporarily disabled
return;
if !self.config.enhanced_metrics {
return;
}
Expand All @@ -475,7 +470,7 @@ impl Lambda {
let tmp_max = bsize * blocks;
let mut tmp_used = bsize * (blocks - bavail);

let mut interval = interval(Duration::from_millis(10));
let mut interval = interval(Duration::from_millis(constants::MONITORING_INTERVAL));
loop {
tokio::select! {
biased;
Expand All @@ -502,9 +497,11 @@ impl Lambda {
});
}

pub fn generate_fd_enhanced_metrics(
pub fn generate_process_metrics(
fd_max: f64,
fd_use: f64,
threads_max: f64,
threads_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
tags: Option<SortedTags>,
) {
Expand All @@ -522,20 +519,15 @@ impl Lambda {
let metric = Metric::new(
constants::FD_USE_METRIC.into(),
MetricValue::distribution(fd_use),
tags,
tags.clone(),
);
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert fd_use metric: {}", e);
}
} else {
debug!("Could not get file descriptor usage data.");
}
}

pub fn generate_threads_enhanced_metrics(
threads_max: f64,
threads_use: f64,
aggr: &mut std::sync::MutexGuard<Aggregator>,
tags: Option<SortedTags>,
) {
let metric = Metric::new(
constants::THREADS_MAX_METRIC.into(),
MetricValue::distribution(threads_max),
Expand All @@ -555,15 +547,12 @@ impl Lambda {
if let Err(e) = aggr.insert(metric) {
error!("Failed to insert threads_use metric: {}", e);
}
} else {
debug!("Could not get thread usage data.");
}
}

#[allow(unreachable_code)]
#[allow(unused_variables)]
#[allow(unused_mut)]
pub fn set_process_enhanced_metrics(&self, mut send_metrics: Receiver<()>) {
// Temporarily disabled
return;
if !self.config.enhanced_metrics {
return;
}
Expand All @@ -573,46 +562,36 @@ impl Lambda {

tokio::spawn(async move {
// get list of all process ids
let pids = proc::get_pid_list();
let mut pids = proc::get_pid_list();

// Set fd_max and initial value for fd_use to -1
// Set fd_max and initial value for fd_use
let fd_max = proc::get_fd_max_data(&pids);
let mut fd_use = -1_f64;
let mut fd_use = proc::get_fd_use_data(&pids).unwrap_or_else(|_| -1_f64);

// Set threads_max and initial value for threads_use to -1
// Set threads_max and initial value for threads_use
let threads_max = proc::get_threads_max_data(&pids);
let mut threads_use = -1_f64;
let mut threads_use = proc::get_threads_use_data(&pids).unwrap_or_else(|_| -1_f64);

let mut interval = interval(Duration::from_millis(1));
let mut interval = interval(Duration::from_millis(constants::MONITORING_INTERVAL));
loop {
tokio::select! {
biased;
// When the stop signal is received, generate final metrics
_ = send_metrics.changed() => {
let mut aggr: std::sync::MutexGuard<Aggregator> =
aggr.lock().expect("lock poisoned");
Self::generate_fd_enhanced_metrics(fd_max, fd_use, &mut aggr, tags.clone());
Self::generate_threads_enhanced_metrics(threads_max, threads_use, &mut aggr, tags);
Self::generate_process_metrics(fd_max, fd_use, threads_max, threads_use, &mut aggr, tags.clone());
return;
}
// Otherwise keep monitoring file descriptor and thread usage periodically
_ = interval.tick() => {
match proc::get_fd_use_data(&pids) {
Ok(fd_use_curr) => {
fd_use = fd_use.max(fd_use_curr);
},
Err(_) => {
debug!("Could not update file descriptor use enhanced metric.");
}
};
match proc::get_threads_use_data(&pids) {
Ok(threads_use_curr) => {
threads_use = threads_use.max(threads_use_curr);
},
Err(_) => {
debug!("Could not update threads use enhanced metric.");
}
};
pids = proc::get_pid_list();
if let Ok(fd_use_curr) = proc::get_fd_use_data(&pids) {
fd_use = fd_use.max(fd_use_curr);
}
if let Ok(threads_use_curr) = proc::get_threads_use_data(&pids) {
threads_use = threads_use.max(threads_use_curr);
}
}
}
}
Expand Down Expand Up @@ -1028,84 +1007,56 @@ mod tests {
}

#[test]
fn test_set_fd_enhanced_metrics_valid_fd_use() {
fn test_set_process_enhanced_metrics_valid_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = 175.0;
let threads_max = 1024.0;
let threads_use = 40.0;

Lambda::generate_fd_enhanced_metrics(
Lambda::generate_process_metrics(
fd_max,
fd_use,
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
None,
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::FD_USE_METRIC, 175.0);
assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0);
}

#[test]
fn test_set_fd_enhanced_metrics_invalid_fd_use() {
fn test_set_process_enhanced_metrics_invalid_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let fd_max = 1024.0;
let fd_use = -1.0;
let threads_max = 1024.0;
let threads_use = -1.0;

Lambda::generate_fd_enhanced_metrics(
Lambda::generate_process_metrics(
fd_max,
fd_use,
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
None,
);

assert_sketch(&metrics_aggr, constants::FD_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::FD_USE_METRIC.into(), &None)
.is_none());
}

#[test]
fn test_set_threads_enhanced_metrics_valid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = 40.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
None,
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);
assert_sketch(&metrics_aggr, constants::THREADS_USE_METRIC, 40.0);
}

#[test]
fn test_set_threads_enhanced_metrics_invalid_threads_use() {
let (metrics_aggr, my_config) = setup();
let lambda = Lambda::new(metrics_aggr.clone(), my_config);

let threads_max = 1024.0;
let threads_use = -1.0;

Lambda::generate_threads_enhanced_metrics(
threads_max,
threads_use,
&mut lambda.aggregator.lock().expect("lock poisoned"),
None,
);

assert_sketch(&metrics_aggr, constants::THREADS_MAX_METRIC, 1024.0);

let aggr = lambda.aggregator.lock().expect("lock poisoned");
assert!(aggr
.get_entry_by_id(constants::THREADS_USE_METRIC.into(), &None)
.is_none());
Expand Down
Loading