Skip to content

Commit 1b13d5f

Browse files
authored
chore(ddsketch, dogstatsd): allow for higher sample rates in DSD payloads (#1087)
1 parent 29c7c61 commit 1b13d5f

File tree

5 files changed

+79
-63
lines changed
  • lib
    • ddsketch-agent/src
    • saluki-components/src
    • saluki-core/src/data_model/event/metric/value

5 files changed

+79
-63
lines changed

lib/ddsketch-agent/src/lib.rs

Lines changed: 55 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,11 @@ impl Bin {
130130
}
131131

132132
#[allow(clippy::cast_possible_truncation)]
133-
fn increment(&mut self, n: u32) -> u32 {
134-
let next = n + u32::from(self.n);
135-
if next > u32::from(MAX_BIN_WIDTH) {
133+
fn increment(&mut self, n: u64) -> u64 {
134+
let next = n + u64::from(self.n);
135+
if next > u64::from(MAX_BIN_WIDTH) {
136136
self.n = MAX_BIN_WIDTH;
137-
return next - u32::from(MAX_BIN_WIDTH);
137+
return next - u64::from(MAX_BIN_WIDTH);
138138
}
139139

140140
// SAFETY: We already know `next` is less than or equal to `MAX_BIN_WIDTH` if we got here, and `MAX_BIN_WIDTH`
@@ -191,7 +191,7 @@ pub struct DDSketch {
191191
bins: SmallVec<[Bin; 4]>,
192192

193193
/// The number of observations within the sketch.
194-
count: u32,
194+
count: u64,
195195

196196
/// The minimum value of all observations within the sketch.
197197
min: f64,
@@ -218,7 +218,7 @@ impl DDSketch {
218218
}
219219

220220
/// Number of samples currently represented by this sketch.
221-
pub fn count(&self) -> u32 {
221+
pub fn count(&self) -> u64 {
222222
self.count
223223
}
224224

@@ -281,7 +281,7 @@ impl DDSketch {
281281
self.bins.clear();
282282
}
283283

284-
fn adjust_basic_stats(&mut self, v: f64, n: u32) {
284+
fn adjust_basic_stats(&mut self, v: f64, n: u64) {
285285
if v < self.min {
286286
self.min = v;
287287
}
@@ -291,21 +291,18 @@ impl DDSketch {
291291
}
292292

293293
self.count += n;
294-
self.sum += v * f64::from(n);
294+
self.sum += v * n as f64;
295295

296296
if n == 1 {
297-
self.avg += (v - self.avg) / f64::from(self.count);
297+
self.avg += (v - self.avg) / self.count as f64;
298298
} else {
299299
// TODO: From the Agent source code, this method apparently loses precision when the
300300
// two averages -- v and self.avg -- are close. Is there a better approach?
301-
self.avg = self.avg + (v - self.avg) * f64::from(n) / f64::from(self.count);
301+
self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
302302
}
303303
}
304304

305-
fn insert_key_counts(&mut self, mut counts: Vec<(i16, u32)>) {
306-
// Counts need to be sorted by key.
307-
counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
308-
305+
fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
309306
let mut temp = SmallVec::<[Bin; 4]>::new();
310307

311308
let mut bins_idx = 0;
@@ -335,7 +332,7 @@ impl DDSketch {
335332
bins_idx += 1;
336333
}
337334
Ordering::Equal => {
338-
generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
335+
generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
339336
bins_idx += 1;
340337
key_idx += 1;
341338
}
@@ -361,6 +358,10 @@ impl DDSketch {
361358
fn insert_keys(&mut self, mut keys: Vec<i16>) {
362359
// Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
363360
// broken.
361+
//
362+
// TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
363+
// collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
364+
// number of bins. Gotta think about this some more.
364365
assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));
365366

366367
keys.sort_unstable();
@@ -395,7 +396,7 @@ impl DDSketch {
395396
}
396397
Ordering::Equal => {
397398
let kn = buf_count_leading_equal(&keys, key_idx);
398-
generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
399+
generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
399400
bins_idx += 1;
400401
key_idx += kn as usize;
401402
}
@@ -466,16 +467,20 @@ impl DDSketch {
466467
}
467468

468469
/// Inserts a single value into the sketch `n` times.
469-
pub fn insert_n(&mut self, v: f64, n: u32) {
470-
// TODO: this should return a result that makes sure we have enough room to actually add N more samples without
471-
// hitting `self.config.max_count()`
472-
self.adjust_basic_stats(v, n);
470+
pub fn insert_n(&mut self, v: f64, n: u64) {
471+
// TODO: This should return a result that makes sure we have enough room to actually add N more samples without
472+
// hitting `self.config.max_count()`.
473+
if n == 1 {
474+
self.insert(v);
475+
} else {
476+
self.adjust_basic_stats(v, n);
473477

474-
let key = SKETCH_CONFIG.key(v);
475-
self.insert_key_counts(vec![(key, n)]);
478+
let key = SKETCH_CONFIG.key(v);
479+
self.insert_key_counts(&[(key, n)]);
480+
}
476481
}
477482

478-
fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u32) {
483+
fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
479484
// Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
480485
// between, inclusive.
481486
let lower_key = SKETCH_CONFIG.key(lower);
@@ -495,15 +500,15 @@ impl DDSketch {
495500
// bound for the current sketch bin, which tells us how much of the input count to apply to the current
496501
// sketch bin.
497502
let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
498-
let fkn = ((upper_bound - lower_bound) / distance) * f64::from(count);
503+
let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
499504
if fkn > 1.0 {
500505
remainder += fkn - fkn.trunc();
501506
}
502507

503508
// SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
504509
// fractional part in the above conditional.
505510
#[allow(clippy::cast_possible_truncation)]
506-
let mut kn = fkn as u32;
511+
let mut kn = fkn as u64;
507512
if remainder > 1.0 {
508513
kn += 1;
509514
remainder -= 1.0;
@@ -532,7 +537,10 @@ impl DDSketch {
532537
key_counts.push((last_key, remaining_count));
533538
}
534539

535-
self.insert_key_counts(key_counts);
540+
// Sort the key counts first, as that's required by `insert_key_counts`.
541+
key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));
542+
543+
self.insert_key_counts(&key_counts);
536544
}
537545

538546
/// ## Errors
@@ -562,14 +570,7 @@ impl DDSketch {
562570
lower = upper;
563571
}
564572

565-
// Each bucket should only have the values that fit within that bucket, which is generally enforced at the
566-
// source level by converting from cumulative buckets, or enforced by the internal structures that hold
567-
// bucketed data i.e. Vector's internal `Histogram` data structure used for collecting histograms from
568-
// `metrics`.
569-
let count =
570-
u32::try_from(bucket.count).unwrap_or_else(|_| unreachable!("count range has already been checked."));
571-
572-
self.insert_interpolate_bucket(lower, upper, count);
573+
self.insert_interpolate_bucket(lower, upper, bucket.count);
573574
lower = bucket.upper_limit;
574575
}
575576

@@ -583,7 +584,7 @@ impl DDSketch {
583584
#[allow(dead_code)]
584585
pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) {
585586
let v = SKETCH_CONFIG.bin_lower_bound(k);
586-
self.adjust_basic_stats(v, u32::from(n));
587+
self.adjust_basic_stats(v, u64::from(n));
587588
self.bins.push(Bin { k, n });
588589
}
589590

@@ -642,7 +643,7 @@ impl DDSketch {
642643
self.min = other.min;
643644
}
644645
self.sum += other.sum;
645-
self.avg = self.avg + (other.avg - self.avg) * f64::from(other.count) / f64::from(self.count);
646+
self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;
646647

647648
// Now merge the bins.
648649
let mut temp = SmallVec::<[Bin; 4]>::new();
@@ -662,7 +663,7 @@ impl DDSketch {
662663
generate_bins(
663664
&mut temp,
664665
other_bin.k,
665-
u32::from(other_bin.n) + u32::from(self.bins[bins_idx].n),
666+
u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
666667
);
667668
bins_idx += 1;
668669
}
@@ -676,7 +677,7 @@ impl DDSketch {
676677

677678
/// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
678679
pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
679-
dogsketch.set_cnt(i64::from(self.count));
680+
dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
680681
dogsketch.set_min(self.min);
681682
dogsketch.set_max(self.max);
682683
dogsketch.set_avg(self.avg);
@@ -733,7 +734,7 @@ impl TryFrom<Dogsketch> for DDSketch {
733734

734735
fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
735736
let mut sketch = DDSketch {
736-
count: u32::try_from(value.cnt).map_err(|_| "sketch count overflows u32")?,
737+
count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
737738
min: value.min,
738739
max: value.max,
739740
avg: value.avg,
@@ -766,13 +767,13 @@ fn float_eq(l_value: f64, r_value: f64) -> bool {
766767
(l_value.is_nan() && r_value.is_nan()) || l_value.approx_eq_ratio(&r_value, RATIO_ERROR)
767768
}
768769

769-
fn rank(count: u32, q: f64) -> f64 {
770-
let rank = q * f64::from(count - 1);
770+
fn rank(count: u64, q: f64) -> f64 {
771+
let rank = q * (count - 1) as f64;
771772
rank.round_ties_even()
772773
}
773774

774775
#[allow(clippy::cast_possible_truncation)]
775-
fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u32 {
776+
fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
776777
if start_idx == keys.len() - 1 {
777778
return 1;
778779
}
@@ -783,8 +784,8 @@ fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u32 {
783784
}
784785

785786
// SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
786-
// so we can't exceed u32 here.
787-
(idx - start_idx) as u32
787+
// so we can't exceed u64 here.
788+
(idx - start_idx) as u64
788789
}
789790

790791
fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
@@ -800,15 +801,15 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
800801
let mut overflow = SmallVec::<[Bin; 4]>::new();
801802

802803
for bin in bins.iter().take(num_to_remove) {
803-
missing += u32::from(bin.n);
804+
missing += u64::from(bin.n);
804805

805-
if missing > u32::from(MAX_BIN_WIDTH) {
806+
if missing > u64::from(MAX_BIN_WIDTH) {
806807
overflow.push(Bin {
807808
k: bin.k,
808809
n: MAX_BIN_WIDTH,
809810
});
810811

811-
missing -= u32::from(MAX_BIN_WIDTH);
812+
missing -= u64::from(MAX_BIN_WIDTH);
812813
}
813814
}
814815

@@ -830,12 +831,12 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
830831
}
831832

832833
#[allow(clippy::cast_possible_truncation)]
833-
fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u32) {
834-
if n < u32::from(MAX_BIN_WIDTH) {
834+
fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
835+
if n < u64::from(MAX_BIN_WIDTH) {
835836
// SAFETY: Cannot truncate `n`, as it's less than a u16 value.
836837
bins.push(Bin { k, n: n as u16 });
837838
} else {
838-
let overflow = n % u32::from(MAX_BIN_WIDTH);
839+
let overflow = n % u64::from(MAX_BIN_WIDTH);
839840
if overflow != 0 {
840841
bins.push(Bin {
841842
k,
@@ -844,7 +845,7 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u32) {
844845
});
845846
}
846847

847-
for _ in 0..(n / u32::from(MAX_BIN_WIDTH)) {
848+
for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
848849
bins.push(Bin { k, n: MAX_BIN_WIDTH });
849850
}
850851
}
@@ -1089,7 +1090,7 @@ mod tests {
10891090
enum Value {
10901091
Float(f64),
10911092
Vec(Vec<f64>),
1092-
NFloats(u32, f64),
1093+
NFloats(u64, f64),
10931094
}
10941095
/// ways to insert values into a sketch
10951096
#[derive(Debug)]
@@ -1214,7 +1215,7 @@ mod tests {
12141215
struct Case {
12151216
lower: f64,
12161217
upper: f64,
1217-
count: u32,
1218+
count: u64,
12181219
allowed_err: f64,
12191220
expected: &'static str,
12201221
}
@@ -1227,7 +1228,7 @@ mod tests {
12271228
assert_eq!(actual.bins(), expected.bins());
12281229
compare_sketches(actual, &expected, case.allowed_err);
12291230

1230-
let actual_count: u32 = actual.bins.iter().map(|b| u32::from(b.n)).sum();
1231+
let actual_count: u64 = actual.bins.iter().map(|b| u64::from(b.n)).sum();
12311232
assert_eq!(actual_count, case.count);
12321233
};
12331234

lib/saluki-components/src/encoders/datadog/metrics/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -751,7 +751,7 @@ fn encode_sketch_metric(
751751
// We convert histograms to sketches to be able to write them out in the payload.
752752
let mut ddsketch = DDSketch::default();
753753
for sample in histogram.samples() {
754-
ddsketch.insert_n(sample.value.into_inner(), sample.weight as u32);
754+
ddsketch.insert_n(sample.value.into_inner(), sample.weight);
755755
}
756756

757757
write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;

lib/saluki-components/src/sources/dogstatsd/mod.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,10 @@ const fn default_dogstatsd_permissive_decoding() -> bool {
125125
true
126126
}
127127

128+
const fn default_dogstatsd_minimum_sample_rate() -> f64 {
129+
0.00000025
130+
}
131+
128132
const fn default_enable_payloads_series() -> bool {
129133
true
130134
}
@@ -279,6 +283,21 @@ pub struct DogStatsDConfiguration {
279283
)]
280284
permissive_decoding: bool,
281285

286+
/// The minimum sample rate allowed for metrics.
287+
///
288+
/// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
289+
/// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
290+
/// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
291+
///
292+
/// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
293+
///
294+
/// Defaults to `0.00000025`. (4,000,000 samples)
295+
#[serde(
296+
rename = "dogstatsd_minimum_sample_rate",
297+
default = "default_dogstatsd_minimum_sample_rate"
298+
)]
299+
minimum_sample_rate: f64,
300+
282301
/// Whether or not to enable sending serie payloads.
283302
///
284303
/// Defaults to `true`.
@@ -414,7 +433,8 @@ impl SourceBuilder for DogStatsDConfiguration {
414433

415434
let codec_config = DogstatsdCodecConfiguration::default()
416435
.with_timestamps(self.no_aggregation_pipeline_support)
417-
.with_permissive_mode(self.permissive_decoding);
436+
.with_permissive_mode(self.permissive_decoding)
437+
.with_minimum_sample_rate(self.minimum_sample_rate);
418438

419439
let codec = DogstatsdCodec::from_configuration(codec_config);
420440

lib/saluki-components/src/transforms/aggregate/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,7 @@ async fn transform_and_push_metric(
709709
.map(|(ts, hist)| {
710710
let mut sketch = DDSketch::default();
711711
for sample in hist.samples() {
712-
sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
712+
sketch.insert_n(sample.value.into_inner(), sample.weight);
713713
}
714714
(ts, sketch)
715715
})

lib/saluki-core/src/data_model/event/metric/value/mod.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -305,16 +305,11 @@ impl MetricValues {
305305
I: Iterator<Item = Result<f64, E>>,
306306
{
307307
let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
308-
let capped_sample_rate = u32::try_from(sample_rate.weight()).unwrap_or(u32::MAX);
309308

310309
let mut sketch = DDSketch::default();
311310
for value in iter {
312311
let value = value?;
313-
if capped_sample_rate == 1 {
314-
sketch.insert(value);
315-
} else {
316-
sketch.insert_n(value, capped_sample_rate);
317-
}
312+
sketch.insert_n(value, sample_rate.weight());
318313
}
319314
Ok(Self::Distribution(sketch.into()))
320315
}

0 commit comments

Comments
 (0)