diff --git a/sds/src/lib.rs b/sds/src/lib.rs index 4527fe2b..72dc2a67 100644 --- a/sds/src/lib.rs +++ b/sds/src/lib.rs @@ -45,8 +45,8 @@ pub use scanner::shared_pool::{SharedPool, SharedPoolGuard}; pub use scanner::suppression::Suppressions; pub use scanner::{ CompiledRule, MatchEmitter, Precedence, RootCompiledRule, RootRuleConfig, RuleResult, - RuleStatus, ScanOptionBuilder, Scanner, ScannerBuilder, SharedData, StringMatch, - StringMatchesCtx, + RuleStatus, ScanMetrics, ScanOptionBuilder, ScanResult, Scanner, ScannerBuilder, SharedData, + StringMatch, StringMatchesCtx, config::RuleConfig, error::{CreateScannerError, ScannerError}, regex_rule::config::{ diff --git a/sds/src/scanner/metrics.rs b/sds/src/scanner/metrics.rs index bd450a8f..66658f82 100644 --- a/sds/src/scanner/metrics.rs +++ b/sds/src/scanner/metrics.rs @@ -1,5 +1,5 @@ use crate::Labels; -use metrics::{Counter, counter}; +use metrics::{Counter, Histogram, counter, histogram}; pub struct RuleMetrics { pub false_positive_excluded_attributes: Counter, @@ -21,15 +21,17 @@ pub struct ScannerMetrics { pub duration_ns: Counter, pub match_count: Counter, pub suppressed_match_count: Counter, + pub cpu_duration: Histogram, } impl ScannerMetrics { - pub fn new(labels: &Labels) -> Self { + pub fn new(labels: &Labels, highcard_labels: &Labels) -> Self { ScannerMetrics { num_scanned_events: counter!("scanned_events", labels.clone()), duration_ns: counter!("scanning.duration", labels.clone()), match_count: counter!("scanning.match_count", labels.clone()), suppressed_match_count: counter!("scanning.suppressed_match_count", labels.clone()), + cpu_duration: histogram!("scanning.cpu_duration", highcard_labels.clone()), } } } diff --git a/sds/src/scanner/mod.rs b/sds/src/scanner/mod.rs index ddb45f86..9d97b811 100644 --- a/sds/src/scanner/mod.rs +++ b/sds/src/scanner/mod.rs @@ -257,14 +257,17 @@ impl StringMatchesCtx<'_> { // The future is spawned onto the tokio runtime immediately so it starts running // in the background let fut = TOKIO_RUNTIME.spawn(async move { + let start = Instant::now(); let mut ctx = AsyncStringMatchesCtx { rule_matches: vec![], }; (func)(&mut ctx).await?; + let io_duration = start.elapsed(); Ok(AsyncRuleInfo { rule_index, rule_matches: ctx.rule_matches, + io_duration, }) }); @@ -299,6 +302,20 @@ pub struct PendingRuleJob { pub struct AsyncRuleInfo { rule_index: usize, rule_matches: Vec, + io_duration: Duration, +} + +#[derive(Debug, Clone)] +pub struct ScanMetrics { + pub total_duration: Duration, + pub io_duration: Duration, + pub num_async_rules: usize, +} + +#[derive(Debug)] +pub struct ScanResult { + pub matches: Vec, + pub metrics: ScanMetrics, } /// A rule result that cannot be async @@ -443,6 +460,7 @@ pub struct Scanner { scanner_features: ScannerFeatures, metrics: ScannerMetrics, labels: Labels, + pub highcard_labels: Labels, match_validators_per_type: AHashMap>, per_scanner_data: SharedData, async_scan_timeout: Duration, @@ -470,16 +488,13 @@ impl Scanner { event: &mut E, options: ScanOptions, ) -> Result, ScannerError> { - block_on(self.internal_scan_with_metrics(event, options)) + block_on(self.internal_scan_with_metrics(event, options)).map(|result| result.matches) } // This function scans the given event with the rules configured in the scanner. // The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait). - // The return value is a list of RuleMatch objects, which contain information about the matches that were found. - pub async fn scan_async( - &self, - event: &mut E, - ) -> Result, ScannerError> { + // The return value is a ScanResult containing matches and timing metrics. + pub async fn scan_async(&self, event: &mut E) -> Result { self.scan_async_with_options(event, ScanOptions::default()) .await } @@ -488,7 +503,7 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result { let fut = self.internal_scan_with_metrics(event, options); // The sleep from the timeout requires being in a tokio context @@ -520,18 +535,36 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result { let start = Instant::now(); let result = self.internal_scan(event, options).await; - match &result { - Ok(rule_matches) => { - self.record_metrics(rule_matches, start); + match result { + Ok((rule_matches, io_duration, num_async_rules)) => { + self.record_metrics(&rule_matches, start); + let total_duration = start.elapsed(); + + // Calculate CPU duration by subtracting I/O wait time from total duration + let cpu_duration = total_duration.saturating_sub(io_duration); + + // Record CPU duration histogram in nanoseconds + self.metrics + .cpu_duration + .record(cpu_duration.as_nanos() as f64); + + Ok(ScanResult { + matches: rule_matches, + metrics: ScanMetrics { + total_duration, + io_duration, + num_async_rules, + }, + }) } - Err(_) => { + Err(e) => { self.record_metrics(&[], start); + Err(e) } } - result } fn process_rule_matches( @@ -611,7 +644,7 @@ impl Scanner { &self, event: &mut E, options: ScanOptions, - ) -> Result, ScannerError> { + ) -> Result<(Vec, Duration, usize), ScannerError> { // If validation is requested, we need to collect match content even if the scanner // wasn't originally configured to return matches let need_match_content = self.scanner_features.return_matches || options.validate_matches; @@ -639,8 +672,11 @@ impl Scanner { // The async jobs were already spawned on the tokio runtime, so the // results just need to be collected + let num_async_jobs = async_jobs.len(); + let mut total_io_duration = Duration::ZERO; for job in async_jobs { let rule_info = job.fut.await.unwrap()?; + total_io_duration += rule_info.io_duration; rule_matches.push_async_matches( &job.path, rule_info @@ -664,7 +700,7 @@ impl Scanner { self.validate_matches(&mut output_rule_matches); } - Ok(output_rule_matches) + Ok((output_rule_matches, total_io_duration, num_async_jobs)) } pub fn suppress_matches( @@ -926,6 +962,7 @@ impl Drop for Scanner { pub struct ScannerBuilder<'a> { rules: &'a [RootRuleConfig>], labels: Labels, + highcard_labels: Labels, scanner_features: ScannerFeatures, async_scan_timeout: Duration, } @@ -935,6 +972,7 @@ impl ScannerBuilder<'_> { ScannerBuilder { rules, labels: Labels::empty(), + highcard_labels: Labels::empty(), scanner_features: ScannerFeatures::default(), async_scan_timeout: Duration::from_secs(60 * 5), } @@ -945,6 +983,11 @@ impl ScannerBuilder<'_> { self } + pub fn highcard_labels(mut self, labels: Labels) -> Self { + self.highcard_labels = labels; + self + } + pub fn with_async_scan_timeout(mut self, duration: Duration) -> Self { self.async_scan_timeout = duration; self @@ -1065,9 +1108,10 @@ impl ScannerBuilder<'_> { rules: compiled_rules, scoped_ruleset, scanner_features: self.scanner_features, - metrics: ScannerMetrics::new(&self.labels), + metrics: ScannerMetrics::new(&self.labels, &self.highcard_labels), match_validators_per_type, labels: self.labels, + highcard_labels: self.highcard_labels, per_scanner_data, async_scan_timeout: self.async_scan_timeout, }) diff --git a/sds/src/scanner/test/async_rule.rs b/sds/src/scanner/test/async_rule.rs index cca7d491..cec1b98e 100644 --- a/sds/src/scanner/test/async_rule.rs +++ b/sds/src/scanner/test/async_rule.rs @@ -65,8 +65,8 @@ async fn run_async_rule() { // async scan let mut input = "this is a secret with random data".to_owned(); - let matched_rules = scanner.scan_async(&mut input).await.unwrap(); - assert_eq!(matched_rules.len(), 1); + let result = scanner.scan_async(&mut input).await.unwrap(); + assert_eq!(result.matches.len(), 1); assert_eq!(input, "this is a [REDACTED] with random data"); } @@ -107,8 +107,8 @@ fn async_scan_outside_of_tokio() { let fut = async move { let mut input = "this is a secret with random data".to_owned(); - let matched_rules = scanner.scan_async(&mut input).await.unwrap(); - assert_eq!(matched_rules.len(), 1); + let result = scanner.scan_async(&mut input).await.unwrap(); + assert_eq!(result.matches.len(), 1); assert_eq!(input, "this is a [REDACTED] with random data"); }; diff --git a/sds/src/scanner/test/metrics.rs b/sds/src/scanner/test/metrics.rs index 5886779a..db147337 100644 --- a/sds/src/scanner/test/metrics.rs +++ b/sds/src/scanner/test/metrics.rs @@ -170,3 +170,131 @@ fn test_regex_match_and_included_keyword_same_index() { Some("=firstname.lastname@acme.com&page2".to_string()) ); } + +#[test] +fn should_submit_cpu_duration_metric_non_async() { + use metrics_util::MetricKind::Histogram; + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + let content_1 = "bcdef"; + let content_2 = "no match"; + + metrics::with_local_recorder(&recorder, || { + let rule_0 = RootRuleConfig::new(RegexRuleConfig::new(content_1).build()) + .match_action(MatchAction::None); + + let scanner = ScannerBuilder::new(&[rule_0]).build().unwrap(); + let mut content = SimpleEvent::Map(BTreeMap::from([ + ( + "key1".to_string(), + SimpleEvent::String(content_1.to_string()), + ), + ( + "key2".to_string(), + SimpleEvent::String(content_2.to_string()), + ), + ])); + + scanner.scan(&mut content).unwrap(); + }); + + let snapshot = snapshotter.snapshot().into_hashmap(); + + let metric_name = "scanning.cpu_duration"; + let metric_value = snapshot + .get(&CompositeKey::new(Histogram, Key::from_name(metric_name))) + .expect("cpu_duration metric not found"); + + // For non-async rules, CPU duration should be > 0 + match &metric_value.2 { + DebugValue::Histogram(values) => { + assert!(!values.is_empty(), "Histogram should have values"); + assert!( + values[0].into_inner() > 0.0, + "CPU duration should be greater than 0" + ); + } + _ => panic!("Expected Histogram value"), + } +} + +#[test] +fn should_submit_cpu_duration_metric_with_async_rule() { + use crate::scanner::config::RuleConfig; + use crate::scanner::{CompiledRule, CreateScannerError, StringMatchesCtx}; + use metrics_util::MetricKind::Histogram; + use std::sync::Arc; + + // Create a custom async rule that sleeps for 100ms + struct SleepyAsyncRuleConfig; + + struct SleepyAsyncCompiledRule; + + impl CompiledRule for SleepyAsyncCompiledRule { + fn get_string_matches( + &self, + _content: &str, + _path: &Path, + ctx: &mut StringMatchesCtx, + ) -> crate::scanner::RuleResult { + ctx.process_async(|_async_ctx| { + Box::pin(async move { + // Sleep for 100ms to simulate I/O + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + Ok(()) + }) + }) + } + } + + impl RuleConfig for SleepyAsyncRuleConfig { + fn convert_to_compiled_rule( + &self, + _rule_index: usize, + _labels: crate::Labels, + ) -> Result, CreateScannerError> { + Ok(Box::new(SleepyAsyncCompiledRule)) + } + } + + let recorder = DebuggingRecorder::new(); + let snapshotter = recorder.snapshotter(); + + metrics::with_local_recorder(&recorder, || { + let rule_0 = RootRuleConfig::new(Arc::new(SleepyAsyncRuleConfig) as Arc) + .match_action(MatchAction::None); + + let scanner = ScannerBuilder::new(&[rule_0]).build().unwrap(); + let mut content = SimpleEvent::Map(BTreeMap::from([( + "key1".to_string(), + SimpleEvent::String("test content".to_string()), + )])); + + // Use scan (which blocks on async internally) + scanner.scan(&mut content).unwrap(); + }); + + let snapshot = snapshotter.snapshot().into_hashmap(); + + let metric_name = "scanning.cpu_duration"; + let metric_value = snapshot + .get(&CompositeKey::new(Histogram, Key::from_name(metric_name))) + .expect("cpu_duration metric not found"); + + // CPU duration should be much less than 100ms since we slept during I/O + match &metric_value.2 { + DebugValue::Histogram(values) => { + assert!(!values.is_empty(), "Histogram should have values"); + // CPU duration should be < 10ms (10_000_000 nanoseconds) + // Since we slept for 100ms, the actual CPU time should be minimal + assert!( + values[0].into_inner() < 10_000_000.0, + "CPU duration should be less than 10ms, got {} ns", + values[0].into_inner() + ); + } + _ => panic!("Expected Histogram value"), + } +}