@@ -9,6 +9,7 @@ use nom::{
99} ;
1010use saluki_context:: { origin:: OriginTagCardinality , tags:: RawTags } ;
1111use saluki_core:: data_model:: event:: metric:: * ;
12+ use tracing:: warn;
1213
1314use super :: { helpers:: * , DogstatsdCodecConfiguration , NomParserError } ;
1415
@@ -72,7 +73,9 @@ pub fn parse_dogstatsd_metric<'a>(
7273 // point downstream to calculate the true metric value.
7374 b'@' => {
7475 let ( _, sample_rate) =
75- all_consuming ( preceded ( tag ( "@" ) , map_res ( double, SampleRate :: try_from) ) ) . parse ( chunk) ?;
76+ all_consuming ( preceded ( tag ( "@" ) , map_res ( double, sample_rate ( metric_name, config) ) ) )
77+ . parse ( chunk) ?;
78+
7679 maybe_sample_rate = Some ( sample_rate) ;
7780 }
7881 // Tags: additional tags to be added to the metric.
@@ -156,6 +159,24 @@ fn permissive_metric_name(input: &[u8]) -> IResult<&[u8], &str> {
156159 . parse ( input)
157160}
158161
162+ #[ inline]
163+ fn sample_rate < ' a > (
164+ metric_name : & ' a str , config : & ' a DogstatsdCodecConfiguration ,
165+ ) -> impl Fn ( f64 ) -> Result < SampleRate , & ' static str > + ' a {
166+ let minimum_sample_rate = config. minimum_sample_rate ;
167+
168+ move |mut raw_sample_rate| {
169+ if raw_sample_rate < minimum_sample_rate {
170+ raw_sample_rate = minimum_sample_rate;
171+ warn ! (
172+ "Sample rate for metric '{}' is below minimum of {}. Clamping to minimum." ,
173+ metric_name, minimum_sample_rate
174+ ) ;
175+ }
176+ SampleRate :: try_from ( raw_sample_rate)
177+ }
178+ }
179+
159180#[ inline]
160181fn raw_metric_values ( input : & [ u8 ] ) -> IResult < & [ u8 ] , ( MetricType , & [ u8 ] ) > {
161182 let ( remaining, raw_values) = terminated ( take_while1 ( |b| b != b'|' ) , tag ( "|" ) ) . parse ( input) ?;
@@ -568,6 +589,47 @@ mod tests {
568589 }
569590 }
570591
592+ #[ test]
593+ fn minimum_sample_rate ( ) {
594+ // Sample rate of 0.01 should lead to a count of 100 when handling a single value.
595+ let minimum_sample_rate = SampleRate :: try_from ( 0.01 ) . unwrap ( ) ;
596+ let config = DogstatsdCodecConfiguration :: default ( ) . with_minimum_sample_rate ( minimum_sample_rate. rate ( ) ) ;
597+
598+ let cases = [
599+ // Worst case scenario: sample rate of zero, or "infinitely sampled".
600+ "test:1|d|@0" . to_string ( ) ,
601+ // Bunch of values with different sample rates all below the minimum sample rate.
602+ "test:1|d|@0.001" . to_string ( ) ,
603+ "test:1|d|@0.0005" . to_string ( ) ,
604+ "test:1|d|@0.00001" . to_string ( ) ,
605+ // Control: use the minimum sample rate.
606+ format ! ( "test:1|d|@{}" , minimum_sample_rate. rate( ) ) ,
607+ // Bunch of values with _greater_ sampling rates than the minimum.
608+ "test:1|d|@0.1" . to_string ( ) ,
609+ "test:1|d|@0.5" . to_string ( ) ,
610+ "test:1|d" . to_string ( ) ,
611+ ] ;
612+
613+ for input in cases {
614+ let metric = parse_dsd_metric_with_conf ( input. as_bytes ( ) , & config)
615+ . expect ( "Should not fail to parse metric." )
616+ . expect ( "Metric should be present." ) ;
617+
618+ let sketch = match metric. values ( ) {
619+ MetricValues :: Distribution ( points) => {
620+ points
621+ . into_iter ( )
622+ . next ( )
623+ . expect ( "Should have at least one sketch point." )
624+ . 1
625+ }
626+ _ => panic ! ( "Unexpected metric type." ) ,
627+ } ;
628+
629+ assert ! ( sketch. count( ) as u64 <= minimum_sample_rate. weight( ) ) ;
630+ }
631+ }
632+
571633 proptest ! {
572634 #![ proptest_config( ProptestConfig :: with_cases( 1000 ) ) ]
573635 #[ test]
0 commit comments