Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 55 additions & 54 deletions lib/ddsketch-agent/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ impl Bin {
}

#[allow(clippy::cast_possible_truncation)]
fn increment(&mut self, n: u32) -> u32 {
let next = n + u32::from(self.n);
if next > u32::from(MAX_BIN_WIDTH) {
fn increment(&mut self, n: u64) -> u64 {
let next = n + u64::from(self.n);
if next > u64::from(MAX_BIN_WIDTH) {
self.n = MAX_BIN_WIDTH;
return next - u32::from(MAX_BIN_WIDTH);
return next - u64::from(MAX_BIN_WIDTH);
}

// SAFETY: We already know `next` is less than or equal to `MAX_BIN_WIDTH` if we got here, and `MAX_BIN_WIDTH`
Expand Down Expand Up @@ -191,7 +191,7 @@ pub struct DDSketch {
bins: SmallVec<[Bin; 4]>,

/// The number of observations within the sketch.
count: u32,
count: u64,

/// The minimum value of all observations within the sketch.
min: f64,
Expand All @@ -218,7 +218,7 @@ impl DDSketch {
}

/// Number of samples currently represented by this sketch.
pub fn count(&self) -> u32 {
pub fn count(&self) -> u64 {
self.count
}

Expand Down Expand Up @@ -281,7 +281,7 @@ impl DDSketch {
self.bins.clear();
}

fn adjust_basic_stats(&mut self, v: f64, n: u32) {
fn adjust_basic_stats(&mut self, v: f64, n: u64) {
if v < self.min {
self.min = v;
}
Expand All @@ -291,21 +291,18 @@ impl DDSketch {
}

self.count += n;
self.sum += v * f64::from(n);
self.sum += v * n as f64;

if n == 1 {
self.avg += (v - self.avg) / f64::from(self.count);
self.avg += (v - self.avg) / self.count as f64;
} else {
// TODO: From the Agent source code, this method apparently loses precision when the
// two averages -- v and self.avg -- are close. Is there a better approach?
self.avg = self.avg + (v - self.avg) * f64::from(n) / f64::from(self.count);
self.avg = self.avg + (v - self.avg) * n as f64 / self.count as f64;
}
}

fn insert_key_counts(&mut self, mut counts: Vec<(i16, u32)>) {
// Counts need to be sorted by key.
counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));

fn insert_key_counts(&mut self, counts: &[(i16, u64)]) {
let mut temp = SmallVec::<[Bin; 4]>::new();

let mut bins_idx = 0;
Expand Down Expand Up @@ -335,7 +332,7 @@ impl DDSketch {
bins_idx += 1;
}
Ordering::Equal => {
generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
bins_idx += 1;
key_idx += 1;
}
Expand All @@ -361,6 +358,10 @@ impl DDSketch {
fn insert_keys(&mut self, mut keys: Vec<i16>) {
// Updating more than 4 billion keys would be very very weird and likely indicative of something horribly
// broken.
//
// TODO: I don't actually understand why I wrote this assertion in this way. Either the code can handle
// collapsing values in order to maintain the relative error bounds, or we have to cap it to the maximum allowed
// number of bins. Gotta think about this some more.
assert!(keys.len() <= u32::MAX.try_into().expect("we don't support 16-bit systems"));

keys.sort_unstable();
Expand Down Expand Up @@ -395,7 +396,7 @@ impl DDSketch {
}
Ordering::Equal => {
let kn = buf_count_leading_equal(&keys, key_idx);
generate_bins(&mut temp, bin.k, u32::from(bin.n) + kn);
generate_bins(&mut temp, bin.k, u64::from(bin.n) + kn);
bins_idx += 1;
key_idx += kn as usize;
}
Expand Down Expand Up @@ -466,16 +467,20 @@ impl DDSketch {
}

/// Inserts a single value into the sketch `n` times.
pub fn insert_n(&mut self, v: f64, n: u32) {
// TODO: this should return a result that makes sure we have enough room to actually add N more samples without
// hitting `self.config.max_count()`
self.adjust_basic_stats(v, n);
pub fn insert_n(&mut self, v: f64, n: u64) {
// TODO: This should return a result that makes sure we have enough room to actually add N more samples without
// hitting `self.config.max_count()`.
if n == 1 {
self.insert(v);
} else {
self.adjust_basic_stats(v, n);

let key = SKETCH_CONFIG.key(v);
self.insert_key_counts(vec![(key, n)]);
let key = SKETCH_CONFIG.key(v);
self.insert_key_counts(&[(key, n)]);
}
}

fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u32) {
fn insert_interpolate_bucket(&mut self, lower: f64, upper: f64, count: u64) {
// Find the keys for the bins where the lower bound and upper bound would end up, and collect all of the keys in
// between, inclusive.
let lower_key = SKETCH_CONFIG.key(lower);
Expand All @@ -495,15 +500,15 @@ impl DDSketch {
// bound for the current sketch bin, which tells us how much of the input count to apply to the current
// sketch bin.
let upper_bound = SKETCH_CONFIG.bin_lower_bound(keys[end_idx]);
let fkn = ((upper_bound - lower_bound) / distance) * f64::from(count);
let fkn = ((upper_bound - lower_bound) / distance) * count as f64;
if fkn > 1.0 {
remainder += fkn - fkn.trunc();
}

// SAFETY: This integer cast is intentional: we want to get the non-fractional part, as we've captured the
// fractional part in the above conditional.
#[allow(clippy::cast_possible_truncation)]
let mut kn = fkn as u32;
let mut kn = fkn as u64;
if remainder > 1.0 {
kn += 1;
remainder -= 1.0;
Expand Down Expand Up @@ -532,7 +537,10 @@ impl DDSketch {
key_counts.push((last_key, remaining_count));
}

self.insert_key_counts(key_counts);
// Sort the key counts first, as that's required by `insert_key_counts`.
key_counts.sort_unstable_by(|(k1, _), (k2, _)| k1.cmp(k2));

self.insert_key_counts(&key_counts);
}

/// ## Errors
Expand Down Expand Up @@ -562,14 +570,7 @@ impl DDSketch {
lower = upper;
}

// Each bucket should only have the values that fit within that bucket, which is generally enforced at the
// source level by converting from cumulative buckets, or enforced by the internal structures that hold
// bucketed data i.e. Vector's internal `Histogram` data structure used for collecting histograms from
// `metrics`.
let count =
u32::try_from(bucket.count).unwrap_or_else(|_| unreachable!("count range has already been checked."));

self.insert_interpolate_bucket(lower, upper, count);
self.insert_interpolate_bucket(lower, upper, bucket.count);
lower = bucket.upper_limit;
}

Expand All @@ -583,7 +584,7 @@ impl DDSketch {
#[allow(dead_code)]
pub(crate) fn insert_raw_bin(&mut self, k: i16, n: u16) {
let v = SKETCH_CONFIG.bin_lower_bound(k);
self.adjust_basic_stats(v, u32::from(n));
self.adjust_basic_stats(v, u64::from(n));
self.bins.push(Bin { k, n });
}

Expand Down Expand Up @@ -642,7 +643,7 @@ impl DDSketch {
self.min = other.min;
}
self.sum += other.sum;
self.avg = self.avg + (other.avg - self.avg) * f64::from(other.count) / f64::from(self.count);
self.avg = self.avg + (other.avg - self.avg) * other.count as f64 / self.count as f64;

// Now merge the bins.
let mut temp = SmallVec::<[Bin; 4]>::new();
Expand All @@ -662,7 +663,7 @@ impl DDSketch {
generate_bins(
&mut temp,
other_bin.k,
u32::from(other_bin.n) + u32::from(self.bins[bins_idx].n),
u64::from(other_bin.n) + u64::from(self.bins[bins_idx].n),
);
bins_idx += 1;
}
Expand All @@ -676,7 +677,7 @@ impl DDSketch {

/// Merges this sketch into the `Dogsketch` Protocol Buffers representation.
pub fn merge_to_dogsketch(&self, dogsketch: &mut Dogsketch) {
dogsketch.set_cnt(i64::from(self.count));
dogsketch.set_cnt(i64::try_from(self.count).unwrap_or(i64::MAX));
dogsketch.set_min(self.min);
dogsketch.set_max(self.max);
dogsketch.set_avg(self.avg);
Expand Down Expand Up @@ -733,7 +734,7 @@ impl TryFrom<Dogsketch> for DDSketch {

fn try_from(value: Dogsketch) -> Result<Self, Self::Error> {
let mut sketch = DDSketch {
count: u32::try_from(value.cnt).map_err(|_| "sketch count overflows u32")?,
count: u64::try_from(value.cnt).map_err(|_| "sketch count overflows u64 or is negative")?,
min: value.min,
max: value.max,
avg: value.avg,
Expand Down Expand Up @@ -766,13 +767,13 @@ fn float_eq(l_value: f64, r_value: f64) -> bool {
(l_value.is_nan() && r_value.is_nan()) || l_value.approx_eq_ratio(&r_value, RATIO_ERROR)
}

fn rank(count: u32, q: f64) -> f64 {
let rank = q * f64::from(count - 1);
fn rank(count: u64, q: f64) -> f64 {
let rank = q * (count - 1) as f64;
rank.round_ties_even()
}

#[allow(clippy::cast_possible_truncation)]
fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u32 {
fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u64 {
if start_idx == keys.len() - 1 {
return 1;
}
Expand All @@ -783,8 +784,8 @@ fn buf_count_leading_equal(keys: &[i16], start_idx: usize) -> u32 {
}

// SAFETY: We limit the size of the vector (used to provide the slice given to us here) to be no larger than 2^32,
// so we can't exceed u32 here.
(idx - start_idx) as u32
// so we can't exceed u64 here.
(idx - start_idx) as u64
}

fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
Expand All @@ -800,15 +801,15 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
let mut overflow = SmallVec::<[Bin; 4]>::new();

for bin in bins.iter().take(num_to_remove) {
missing += u32::from(bin.n);
missing += u64::from(bin.n);

if missing > u32::from(MAX_BIN_WIDTH) {
if missing > u64::from(MAX_BIN_WIDTH) {
overflow.push(Bin {
k: bin.k,
n: MAX_BIN_WIDTH,
});

missing -= u32::from(MAX_BIN_WIDTH);
missing -= u64::from(MAX_BIN_WIDTH);
}
}

Expand All @@ -830,12 +831,12 @@ fn trim_left(bins: &mut SmallVec<[Bin; 4]>, bin_limit: u16) {
}

#[allow(clippy::cast_possible_truncation)]
fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u32) {
if n < u32::from(MAX_BIN_WIDTH) {
fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u64) {
if n < u64::from(MAX_BIN_WIDTH) {
// SAFETY: Cannot truncate `n`, as it's less than a u16 value.
bins.push(Bin { k, n: n as u16 });
} else {
let overflow = n % u32::from(MAX_BIN_WIDTH);
let overflow = n % u64::from(MAX_BIN_WIDTH);
if overflow != 0 {
bins.push(Bin {
k,
Expand All @@ -844,7 +845,7 @@ fn generate_bins(bins: &mut SmallVec<[Bin; 4]>, k: i16, n: u32) {
});
}

for _ in 0..(n / u32::from(MAX_BIN_WIDTH)) {
for _ in 0..(n / u64::from(MAX_BIN_WIDTH)) {
bins.push(Bin { k, n: MAX_BIN_WIDTH });
}
}
Expand Down Expand Up @@ -1089,7 +1090,7 @@ mod tests {
enum Value {
Float(f64),
Vec(Vec<f64>),
NFloats(u32, f64),
NFloats(u64, f64),
}
/// ways to insert values into a sketch
#[derive(Debug)]
Expand Down Expand Up @@ -1214,7 +1215,7 @@ mod tests {
struct Case {
lower: f64,
upper: f64,
count: u32,
count: u64,
allowed_err: f64,
expected: &'static str,
}
Expand All @@ -1227,7 +1228,7 @@ mod tests {
assert_eq!(actual.bins(), expected.bins());
compare_sketches(actual, &expected, case.allowed_err);

let actual_count: u32 = actual.bins.iter().map(|b| u32::from(b.n)).sum();
let actual_count: u64 = actual.bins.iter().map(|b| u64::from(b.n)).sum();
assert_eq!(actual_count, case.count);
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ fn encode_sketch_metric(
// We convert histograms to sketches to be able to write them out in the payload.
let mut ddsketch = DDSketch::default();
for sample in histogram.samples() {
ddsketch.insert_n(sample.value.into_inner(), sample.weight as u32);
ddsketch.insert_n(sample.value.into_inner(), sample.weight);
}

write_dogsketch(output_stream, scratch_buf, packed_scratch_buf, timestamp, &ddsketch)?;
Expand Down
22 changes: 21 additions & 1 deletion lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ const fn default_dogstatsd_permissive_decoding() -> bool {
true
}

const fn default_dogstatsd_minimum_sample_rate() -> f64 {
0.00000025
}

const fn default_enable_payloads_series() -> bool {
true
}
Expand Down Expand Up @@ -279,6 +283,21 @@ pub struct DogStatsDConfiguration {
)]
permissive_decoding: bool,

/// The minimum sample rate allowed for metrics.
///
/// When metrics are sent with a sample rate _lower_ than this value then it will be clamped to this value. This is
/// done in order to ensure an upper bound on how many equivalent samples are tracked for the metric, as high sample
/// rates (very small numbers, such as `0.00000001`) can lead to large memory growth.
///
/// A warning log will be emitted when clamping occurs, as this represents an effective loss of metric samples.
///
/// Defaults to `0.00000025`. (4,000,000 samples)
#[serde(
rename = "dogstatsd_minimum_sample_rate",
default = "default_dogstatsd_minimum_sample_rate"
)]
minimum_sample_rate: f64,

/// Whether or not to enable sending serie payloads.
///
/// Defaults to `true`.
Expand Down Expand Up @@ -414,7 +433,8 @@ impl SourceBuilder for DogStatsDConfiguration {

let codec_config = DogstatsdCodecConfiguration::default()
.with_timestamps(self.no_aggregation_pipeline_support)
.with_permissive_mode(self.permissive_decoding);
.with_permissive_mode(self.permissive_decoding)
.with_minimum_sample_rate(self.minimum_sample_rate);

let codec = DogstatsdCodec::from_configuration(codec_config);

Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ async fn transform_and_push_metric(
.map(|(ts, hist)| {
let mut sketch = DDSketch::default();
for sample in hist.samples() {
sketch.insert_n(sample.value.into_inner(), sample.weight as u32);
sketch.insert_n(sample.value.into_inner(), sample.weight);
}
(ts, sketch)
})
Expand Down
7 changes: 1 addition & 6 deletions lib/saluki-core/src/data_model/event/metric/value/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,11 @@ impl MetricValues {
I: Iterator<Item = Result<f64, E>>,
{
let sample_rate = sample_rate.unwrap_or(SampleRate::unsampled());
let capped_sample_rate = u32::try_from(sample_rate.weight()).unwrap_or(u32::MAX);

let mut sketch = DDSketch::default();
for value in iter {
let value = value?;
if capped_sample_rate == 1 {
sketch.insert(value);
} else {
sketch.insert_n(value, capped_sample_rate);
}
sketch.insert_n(value, sample_rate.weight());
}
Ok(Self::Distribution(sketch.into()))
}
Expand Down