Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sds/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ pub use scanner::shared_pool::{SharedPool, SharedPoolGuard};
pub use scanner::suppression::Suppressions;
pub use scanner::{
CompiledRule, MatchEmitter, Precedence, RootCompiledRule, RootRuleConfig, RuleResult,
RuleStatus, ScanOptionBuilder, Scanner, ScannerBuilder, SharedData, StringMatch,
StringMatchesCtx,
RuleStatus, ScanMetrics, ScanOptionBuilder, ScanResult, Scanner, ScannerBuilder, SharedData,
StringMatch, StringMatchesCtx,
config::RuleConfig,
error::{CreateScannerError, ScannerError},
regex_rule::config::{
Expand Down
6 changes: 4 additions & 2 deletions sds/src/scanner/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::Labels;
use metrics::{Counter, counter};
use metrics::{Counter, Histogram, counter, histogram};

pub struct RuleMetrics {
pub false_positive_excluded_attributes: Counter,
Expand All @@ -21,15 +21,17 @@ pub struct ScannerMetrics {
pub duration_ns: Counter,
pub match_count: Counter,
pub suppressed_match_count: Counter,
pub cpu_duration: Histogram,
}

impl ScannerMetrics {
pub fn new(labels: &Labels) -> Self {
pub fn new(labels: &Labels, highcard_labels: &Labels) -> Self {
ScannerMetrics {
num_scanned_events: counter!("scanned_events", labels.clone()),
duration_ns: counter!("scanning.duration", labels.clone()),
match_count: counter!("scanning.match_count", labels.clone()),
suppressed_match_count: counter!("scanning.suppressed_match_count", labels.clone()),
cpu_duration: histogram!("scanning.cpu_duration", highcard_labels.clone()),
}
}
}
76 changes: 60 additions & 16 deletions sds/src/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,17 @@ impl StringMatchesCtx<'_> {
// The future is spawned onto the tokio runtime immediately so it starts running
// in the background
let fut = TOKIO_RUNTIME.spawn(async move {
let start = Instant::now();
let mut ctx = AsyncStringMatchesCtx {
rule_matches: vec![],
};
(func)(&mut ctx).await?;
let io_duration = start.elapsed();

Ok(AsyncRuleInfo {
rule_index,
rule_matches: ctx.rule_matches,
io_duration,
})
});

Expand Down Expand Up @@ -299,6 +302,20 @@ pub struct PendingRuleJob {
pub struct AsyncRuleInfo {
rule_index: usize,
rule_matches: Vec<StringMatch>,
io_duration: Duration,
}

#[derive(Debug, Clone)]
pub struct ScanMetrics {
pub total_duration: Duration,
pub io_duration: Duration,
pub num_async_rules: usize,
}

#[derive(Debug)]
pub struct ScanResult {
pub matches: Vec<RuleMatch>,
pub metrics: ScanMetrics,
}

/// A rule result that cannot be async
Expand Down Expand Up @@ -443,6 +460,7 @@ pub struct Scanner {
scanner_features: ScannerFeatures,
metrics: ScannerMetrics,
labels: Labels,
pub highcard_labels: Labels,
match_validators_per_type: AHashMap<InternalMatchValidationType, Box<dyn MatchValidator>>,
per_scanner_data: SharedData,
async_scan_timeout: Duration,
Expand Down Expand Up @@ -470,16 +488,13 @@ impl Scanner {
event: &mut E,
options: ScanOptions,
) -> Result<Vec<RuleMatch>, ScannerError> {
block_on(self.internal_scan_with_metrics(event, options))
block_on(self.internal_scan_with_metrics(event, options)).map(|result| result.matches)
}

// This function scans the given event with the rules configured in the scanner.
// The event parameter is a mutable reference to the event that should be scanned (implemented the Event trait).
// The return value is a list of RuleMatch objects, which contain information about the matches that were found.
pub async fn scan_async<E: Event>(
&self,
event: &mut E,
) -> Result<Vec<RuleMatch>, ScannerError> {
// The return value is a ScanResult containing matches and timing metrics.
pub async fn scan_async<E: Event>(&self, event: &mut E) -> Result<ScanResult, ScannerError> {
self.scan_async_with_options(event, ScanOptions::default())
.await
}
Expand All @@ -488,7 +503,7 @@ impl Scanner {
&self,
event: &mut E,
options: ScanOptions,
) -> Result<Vec<RuleMatch>, ScannerError> {
) -> Result<ScanResult, ScannerError> {
let fut = self.internal_scan_with_metrics(event, options);

// The sleep from the timeout requires being in a tokio context
Expand Down Expand Up @@ -520,18 +535,36 @@ impl Scanner {
&self,
event: &mut E,
options: ScanOptions,
) -> Result<Vec<RuleMatch>, ScannerError> {
) -> Result<ScanResult, ScannerError> {
let start = Instant::now();
let result = self.internal_scan(event, options).await;
match &result {
Ok(rule_matches) => {
self.record_metrics(rule_matches, start);
match result {
Ok((rule_matches, io_duration, num_async_rules)) => {
self.record_metrics(&rule_matches, start);
let total_duration = start.elapsed();

// Calculate CPU duration by subtracting I/O wait time from total duration
let cpu_duration = total_duration.saturating_sub(io_duration);

// Record CPU duration histogram in nanoseconds
self.metrics
.cpu_duration
.record(cpu_duration.as_nanos() as f64);

Ok(ScanResult {
matches: rule_matches,
metrics: ScanMetrics {
total_duration,
io_duration,
num_async_rules,
},
})
}
Err(_) => {
Err(e) => {
self.record_metrics(&[], start);
Err(e)
}
}
result
}

fn process_rule_matches<E: Event>(
Expand Down Expand Up @@ -611,7 +644,7 @@ impl Scanner {
&self,
event: &mut E,
options: ScanOptions,
) -> Result<Vec<RuleMatch>, ScannerError> {
) -> Result<(Vec<RuleMatch>, Duration, usize), ScannerError> {
// If validation is requested, we need to collect match content even if the scanner
// wasn't originally configured to return matches
let need_match_content = self.scanner_features.return_matches || options.validate_matches;
Expand Down Expand Up @@ -639,8 +672,11 @@ impl Scanner {

// The async jobs were already spawned on the tokio runtime, so the
// results just need to be collected
let num_async_jobs = async_jobs.len();
let mut total_io_duration = Duration::ZERO;
for job in async_jobs {
let rule_info = job.fut.await.unwrap()?;
total_io_duration += rule_info.io_duration;
rule_matches.push_async_matches(
&job.path,
rule_info
Expand All @@ -664,7 +700,7 @@ impl Scanner {
self.validate_matches(&mut output_rule_matches);
}

Ok(output_rule_matches)
Ok((output_rule_matches, total_io_duration, num_async_jobs))
}

pub fn suppress_matches<E: Encoding>(
Expand Down Expand Up @@ -926,6 +962,7 @@ impl Drop for Scanner {
pub struct ScannerBuilder<'a> {
rules: &'a [RootRuleConfig<Arc<dyn RuleConfig>>],
labels: Labels,
highcard_labels: Labels,
scanner_features: ScannerFeatures,
async_scan_timeout: Duration,
}
Expand All @@ -935,6 +972,7 @@ impl ScannerBuilder<'_> {
ScannerBuilder {
rules,
labels: Labels::empty(),
highcard_labels: Labels::empty(),
scanner_features: ScannerFeatures::default(),
async_scan_timeout: Duration::from_secs(60 * 5),
}
Expand All @@ -945,6 +983,11 @@ impl ScannerBuilder<'_> {
self
}

pub fn highcard_labels(mut self, labels: Labels) -> Self {
self.highcard_labels = labels;
self
}

pub fn with_async_scan_timeout(mut self, duration: Duration) -> Self {
self.async_scan_timeout = duration;
self
Expand Down Expand Up @@ -1065,9 +1108,10 @@ impl ScannerBuilder<'_> {
rules: compiled_rules,
scoped_ruleset,
scanner_features: self.scanner_features,
metrics: ScannerMetrics::new(&self.labels),
metrics: ScannerMetrics::new(&self.labels, &self.highcard_labels),
match_validators_per_type,
labels: self.labels,
highcard_labels: self.highcard_labels,
per_scanner_data,
async_scan_timeout: self.async_scan_timeout,
})
Expand Down
8 changes: 4 additions & 4 deletions sds/src/scanner/test/async_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ async fn run_async_rule() {

// async scan
let mut input = "this is a secret with random data".to_owned();
let matched_rules = scanner.scan_async(&mut input).await.unwrap();
assert_eq!(matched_rules.len(), 1);
let result = scanner.scan_async(&mut input).await.unwrap();
assert_eq!(result.matches.len(), 1);
assert_eq!(input, "this is a [REDACTED] with random data");
}

Expand Down Expand Up @@ -107,8 +107,8 @@ fn async_scan_outside_of_tokio() {

let fut = async move {
let mut input = "this is a secret with random data".to_owned();
let matched_rules = scanner.scan_async(&mut input).await.unwrap();
assert_eq!(matched_rules.len(), 1);
let result = scanner.scan_async(&mut input).await.unwrap();
assert_eq!(result.matches.len(), 1);
assert_eq!(input, "this is a [REDACTED] with random data");
};

Expand Down
128 changes: 128 additions & 0 deletions sds/src/scanner/test/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,131 @@ fn test_regex_match_and_included_keyword_same_index() {
Some("[email protected]&page2".to_string())
);
}

#[test]
fn should_submit_cpu_duration_metric_non_async() {
use metrics_util::MetricKind::Histogram;

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

let content_1 = "bcdef";
let content_2 = "no match";

metrics::with_local_recorder(&recorder, || {
let rule_0 = RootRuleConfig::new(RegexRuleConfig::new(content_1).build())
.match_action(MatchAction::None);

let scanner = ScannerBuilder::new(&[rule_0]).build().unwrap();
let mut content = SimpleEvent::Map(BTreeMap::from([
(
"key1".to_string(),
SimpleEvent::String(content_1.to_string()),
),
(
"key2".to_string(),
SimpleEvent::String(content_2.to_string()),
),
]));

scanner.scan(&mut content).unwrap();
});

let snapshot = snapshotter.snapshot().into_hashmap();

let metric_name = "scanning.cpu_duration";
let metric_value = snapshot
.get(&CompositeKey::new(Histogram, Key::from_name(metric_name)))
.expect("cpu_duration metric not found");

// For non-async rules, CPU duration should be > 0
match &metric_value.2 {
DebugValue::Histogram(values) => {
assert!(!values.is_empty(), "Histogram should have values");
assert!(
values[0].into_inner() > 0.0,
"CPU duration should be greater than 0"
);
}
_ => panic!("Expected Histogram value"),
}
}

#[test]
fn should_submit_cpu_duration_metric_with_async_rule() {
use crate::scanner::config::RuleConfig;
use crate::scanner::{CompiledRule, CreateScannerError, StringMatchesCtx};
use metrics_util::MetricKind::Histogram;
use std::sync::Arc;

// Create a custom async rule that sleeps for 100ms
struct SleepyAsyncRuleConfig;

struct SleepyAsyncCompiledRule;

impl CompiledRule for SleepyAsyncCompiledRule {
fn get_string_matches(
&self,
_content: &str,
_path: &Path,
ctx: &mut StringMatchesCtx,
) -> crate::scanner::RuleResult {
ctx.process_async(|_async_ctx| {
Box::pin(async move {
// Sleep for 100ms to simulate I/O
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
Ok(())
})
})
}
}

impl RuleConfig for SleepyAsyncRuleConfig {
fn convert_to_compiled_rule(
&self,
_rule_index: usize,
_labels: crate::Labels,
) -> Result<Box<dyn CompiledRule>, CreateScannerError> {
Ok(Box::new(SleepyAsyncCompiledRule))
}
}

let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();

metrics::with_local_recorder(&recorder, || {
let rule_0 = RootRuleConfig::new(Arc::new(SleepyAsyncRuleConfig) as Arc<dyn RuleConfig>)
.match_action(MatchAction::None);

let scanner = ScannerBuilder::new(&[rule_0]).build().unwrap();
let mut content = SimpleEvent::Map(BTreeMap::from([(
"key1".to_string(),
SimpleEvent::String("test content".to_string()),
)]));

// Use scan (which blocks on async internally)
scanner.scan(&mut content).unwrap();
});

let snapshot = snapshotter.snapshot().into_hashmap();

let metric_name = "scanning.cpu_duration";
let metric_value = snapshot
.get(&CompositeKey::new(Histogram, Key::from_name(metric_name)))
.expect("cpu_duration metric not found");

// CPU duration should be much less than 100ms since we slept during I/O
match &metric_value.2 {
DebugValue::Histogram(values) => {
assert!(!values.is_empty(), "Histogram should have values");
// CPU duration should be < 10ms (10_000_000 nanoseconds)
// Since we slept for 100ms, the actual CPU time should be minimal
assert!(
values[0].into_inner() < 10_000_000.0,
"CPU duration should be less than 10ms, got {} ns",
values[0].into_inner()
);
}
_ => panic!("Expected Histogram value"),
}
}