Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) timer.get999thPercentile() * durationFactor)
);
}

@Override
public long[] getValues() {
return timer.values();
}
};
}

Expand All @@ -108,6 +113,11 @@ public Iterable<Interval> getIntervals() {
new Interval(Interval.Quantile.P_99_9, (float) histogram.get999thPercentile())
);
}

@Override
public long[] getValues() {
return histogram.values();
}
};
}

Expand All @@ -125,6 +135,11 @@ public Iterable<Interval> getIntervals() {

return Interval.asIntervals(Interval.Quantile.STANDARD_PERCENTILES, q -> (float) snapshot.getValue(q.value));
}

@Override
public long[] getValues() {
return metric.getSnapshot().getValues();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.cassandra.metrics.CassandraMetricsRegistry.JmxMeterMBean;
import org.apache.cassandra.utils.EstimatedHistogram;

import java.util.ArrayList;
import java.util.stream.Stream;

public final class CollectorFunctions {
Expand Down Expand Up @@ -176,4 +177,58 @@ protected static CollectorFunction<SamplingCounting> samplingAndCountingAsSummar
public static CollectorFunction<SamplingCounting> samplingAndCountingAsSummary() {
return samplingAndCountingAsSummary(FloatFloatFunction.identity());
}

/**
* Collect a {@link SamplingCounting} as a Prometheus histogram.
*/
protected static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram(final FloatFloatFunction bucketScaleFunction) {
// Set some limits on the range so we don't export all 170 buckets
float bucketMin = 0.0001f; // 0.1ms
float bucketMax = 60.0f; // 60sec

// Avoid recomputing the buckets frequently. Cassandra uses ~170 buckets
float[] cachedBuckets = newBucketOffsets(200, bucketScaleFunction);

return group -> {
final Stream<HistogramMetricFamily.Histogram> histogramStream = group.labeledObjects().entrySet().stream()
.map(e -> {
long[] values = e.getValue().getValues();
float[] buckets = values.length <= cachedBuckets.length
? cachedBuckets
: newBucketOffsets(values.length, bucketScaleFunction);

float sum = 0;
long count = 0;
ArrayList<Interval> intervals = new ArrayList<>();
assert values[values.length-1] == 0;

for (int i = 0; i < values.length; i++) {
if (values[i] != 0) {
sum += buckets[i] * values[i];
count += values[i];
}
if (buckets[i] >= bucketMin && buckets[i] <= bucketMax) {
intervals.add(new Interval(new Interval.Quantile(buckets[i]), count));
}
}

return new HistogramMetricFamily.Histogram(e.getKey(), sum, count, intervals);
});

return Stream.of(new HistogramMetricFamily(group.name(), group.help(), histogramStream));
};
}

public static CollectorFunction<SamplingCounting> samplingAndCountingAsHistogram() {
return samplingAndCountingAsHistogram(FloatFloatFunction.identity());
}

private static float[] newBucketOffsets(int size, final FloatFloatFunction bucketScaleFunction) {
long[] rawOffsets = EstimatedHistogram.newOffsets(size, false);
float[] adjustedOffsets = new float[size];
for (int i = 0; i < size; i++) {
adjustedOffsets[i] = bucketScaleFunction.apply(rawOffsets[i]);
}
return adjustedOffsets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,15 @@ private static FactoryBuilder.CollectorConstructor histogramAsSummaryCollectorCo
};
}

private static FactoryBuilder.CollectorConstructor histogramAsHistogramCollectorConstructor() {
return (name, help, labels, mBean) -> {
final NamedObject<SamplingCounting> samplingCountingNamedObject = CassandraMetricsUtilities.jmxHistogramAsSamplingCounting(mBean);

return new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, samplingCountingNamedObject),
samplingAndCountingAsHistogram(MetricValueConversionFunctions::nanosecondsToSeconds));
};
}

private static <T> FactoryBuilder.CollectorConstructor functionalCollectorConstructor(final FunctionalMetricFamilyCollector.CollectorFunction<T> function) {
return (final String name, final String help, final Labels labels, final NamedObject<?> mBean) ->
new FunctionalMetricFamilyCollector<>(name, help, ImmutableMap.of(labels, mBean.<T>cast()), function);
Expand Down Expand Up @@ -592,6 +601,8 @@ public List<Factory> get() {

builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "Latency", "latency_seconds", "Request latency."));
builder.add(clientRequestMetricFactory(LatencyMetricGroupSummaryCollector::collectorForMBean, "TotalLatency", "latency_seconds", "Total request duration."));

builder.add(clientRequestMetricFactory(histogramAsHistogramCollectorConstructor(), "Latency", "latency_hist_seconds", "Request latency."));
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public interface SamplingCounting {
long getCount();

Iterable<Interval> getIntervals();

long[] getValues();
}