Skip to content

Commit 9df7f9e

Browse files
[exporter/prometheus] Expose native histograms (#43053)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Closes #33703 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.-->
1 parent 31a8be2 commit 9df7f9e

File tree

5 files changed

+894
-0
lines changed

5 files changed

+894
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: exporter/prometheus
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add support to exponential histograms
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [33703]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

exporter/prometheusexporter/accumulator.go

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package prometheusexporter // import "github.com/open-telemetry/opentelemetry-co
55

66
import (
77
"fmt"
8+
"math"
89
"sort"
910
"strings"
1011
"sync"
@@ -90,6 +91,8 @@ func (a *lastValueAccumulator) addMetric(metric pmetric.Metric, scopeName, scope
9091
return a.accumulateHistogram(metric, scopeName, scopeVersion, scopeSchemaURL, scopeAttributes, resourceAttrs, now)
9192
case pmetric.MetricTypeSummary:
9293
return a.accumulateSummary(metric, scopeName, scopeVersion, scopeSchemaURL, scopeAttributes, resourceAttrs, now)
94+
case pmetric.MetricTypeExponentialHistogram:
95+
return a.accumulateExponentialHistogram(metric, scopeName, scopeVersion, scopeSchemaURL, scopeAttributes, resourceAttrs, now)
9396
default:
9497
a.logger.With(
9598
zap.String("data_type", metric.Type().String()),
@@ -297,6 +300,77 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, scopeN
297300
return n
298301
}
299302

303+
func (a *lastValueAccumulator) accumulateExponentialHistogram(metric pmetric.Metric, scopeName, scopeVersion, scopeSchemaURL string, scopeAttributes, resourceAttrs pcommon.Map, now time.Time) (n int) {
304+
expHistogram := metric.ExponentialHistogram()
305+
a.logger.Debug("Accumulate native histogram.....")
306+
dps := expHistogram.DataPoints()
307+
308+
for i := 0; i < dps.Len(); i++ {
309+
ip := dps.At(i)
310+
signature := timeseriesSignature(scopeName, scopeVersion, scopeSchemaURL, scopeAttributes, metric, ip.Attributes(), resourceAttrs) // uniquely identify this time series you are accumulating for
311+
if ip.Flags().NoRecordedValue() {
312+
a.registeredMetrics.Delete(signature)
313+
return 0
314+
}
315+
316+
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
317+
if !ok {
318+
// first data point
319+
m := copyMetricMetadata(metric)
320+
ip.CopyTo(m.SetEmptyExponentialHistogram().DataPoints().AppendEmpty())
321+
m.ExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
322+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scopeName: scopeName, scopeVersion: scopeVersion, scopeSchemaURL: scopeSchemaURL, scopeAttributes: scopeAttributes, updated: now})
323+
n++
324+
continue
325+
}
326+
mv := v.(*accumulatedValue)
327+
328+
m := copyMetricMetadata(metric)
329+
m.SetEmptyExponentialHistogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
330+
331+
switch expHistogram.AggregationTemporality() {
332+
case pmetric.AggregationTemporalityDelta:
333+
pp := mv.value.ExponentialHistogram().DataPoints().At(0) // previous aggregated value for time range
334+
if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() {
335+
// treat misalignment as restart and reset, or violation of single-writer principle and drop
336+
a.logger.With(
337+
zap.String("ip_start_time", ip.StartTimestamp().String()),
338+
zap.String("pp_start_time", pp.StartTimestamp().String()),
339+
zap.String("pp_timestamp", pp.Timestamp().String()),
340+
zap.String("ip_timestamp", ip.Timestamp().String()),
341+
).Warn("Misaligned starting timestamps")
342+
if !ip.StartTimestamp().AsTime().After(pp.Timestamp().AsTime()) {
343+
a.logger.With(
344+
zap.String("metric_name", metric.Name()),
345+
).Warn("Dropped misaligned histogram datapoint")
346+
continue
347+
}
348+
a.logger.Debug("Treating it like reset")
349+
ip.CopyTo(m.ExponentialHistogram().DataPoints().AppendEmpty())
350+
} else {
351+
a.logger.Debug("Accumulate another histogram datapoint")
352+
accumulateExponentialHistogramValues(pp, ip, m.ExponentialHistogram().DataPoints().AppendEmpty())
353+
}
354+
case pmetric.AggregationTemporalityCumulative:
355+
if ip.Timestamp().AsTime().Before(mv.value.ExponentialHistogram().DataPoints().At(0).Timestamp().AsTime()) {
356+
// only keep datapoint with latest timestamp
357+
continue
358+
}
359+
360+
ip.CopyTo(m.ExponentialHistogram().DataPoints().AppendEmpty())
361+
default:
362+
// unsupported temporality
363+
continue
364+
}
365+
366+
// Store the updated metric and advance count
367+
a.registeredMetrics.Store(signature, &accumulatedValue{value: m, resourceAttrs: resourceAttrs, scopeName: scopeName, scopeVersion: scopeVersion, scopeSchemaURL: scopeSchemaURL, scopeAttributes: scopeAttributes, updated: now})
368+
n++
369+
}
370+
371+
return n
372+
}
373+
300374
// Collect returns a slice with relevant aggregated metrics and their resource attributes.
301375
func (a *lastValueAccumulator) Collect() ([]pmetric.Metric, []pcommon.Map, []string, []string, []string, []pcommon.Map) {
302376
a.logger.Debug("Accumulator collect called")
@@ -406,3 +480,179 @@ func accumulateHistogramValues(prev, current, dest pmetric.HistogramDataPoint) {
406480

407481
dest.ExplicitBounds().FromRaw(newer.ExplicitBounds().AsRaw())
408482
}
483+
484+
// calculateBucketUpperBound calculates the upper bound for an exponential histogram bucket
485+
func calculateBucketUpperBound(scale, offset int32, index int) float64 {
486+
// For exponential histograms with base = 2:
487+
// Upper bound = 2^(scale + offset + index + 1)
488+
return math.Pow(2, float64(scale+offset+int32(index)+1))
489+
}
490+
491+
// filterBucketsForZeroThreshold filters buckets that fall below the zero threshold
492+
// and returns the filtered buckets and the additional zero count
493+
func filterBucketsForZeroThreshold(offset int32, counts []uint64, scale int32, zeroThreshold float64) (newOffset int32, filteredCounts []uint64, additionalZeroCount uint64) {
494+
if len(counts) == 0 || zeroThreshold <= 0 {
495+
return offset, counts, 0
496+
}
497+
498+
additionalZeroCount = uint64(0)
499+
filteredCounts = make([]uint64, 0, len(counts))
500+
newOffset = offset
501+
502+
// Find the first bucket whose upper bound is > zeroThreshold
503+
for i, count := range counts {
504+
upperBound := calculateBucketUpperBound(scale, offset, i)
505+
if upperBound > zeroThreshold {
506+
filteredCounts = append(filteredCounts, counts[i:]...)
507+
break
508+
}
509+
// This bucket's range falls entirely below the zero threshold
510+
additionalZeroCount += count
511+
newOffset = offset + int32(i) + 1 // Move offset to next bucket
512+
}
513+
514+
// If all buckets were filtered out, return empty buckets
515+
if len(filteredCounts) == 0 {
516+
return 0, nil, additionalZeroCount
517+
}
518+
519+
return newOffset, filteredCounts, additionalZeroCount
520+
}
521+
522+
func accumulateExponentialHistogramValues(prev, current, dest pmetric.ExponentialHistogramDataPoint) {
523+
if current.Timestamp().AsTime().Before(prev.Timestamp().AsTime()) {
524+
dest.SetStartTimestamp(current.StartTimestamp())
525+
prev.Attributes().CopyTo(dest.Attributes())
526+
dest.SetTimestamp(prev.Timestamp())
527+
} else {
528+
dest.SetStartTimestamp(prev.StartTimestamp())
529+
current.Attributes().CopyTo(dest.Attributes())
530+
dest.SetTimestamp(current.Timestamp())
531+
}
532+
533+
targetScale := min(current.Scale(), prev.Scale())
534+
dest.SetScale(targetScale)
535+
536+
// Determine the new zero threshold (maximum of the two)
537+
newZeroThreshold := max(prev.ZeroThreshold(), current.ZeroThreshold())
538+
dest.SetZeroThreshold(newZeroThreshold)
539+
540+
// Downscale buckets to target scale
541+
pPosOff, pPosCounts := downscaleBucketSide(prev.Positive().Offset(), prev.Positive().BucketCounts().AsRaw(), prev.Scale(), targetScale)
542+
pNegOff, pNegCounts := downscaleBucketSide(prev.Negative().Offset(), prev.Negative().BucketCounts().AsRaw(), prev.Scale(), targetScale)
543+
cPosOff, cPosCounts := downscaleBucketSide(current.Positive().Offset(), current.Positive().BucketCounts().AsRaw(), current.Scale(), targetScale)
544+
cNegOff, cNegCounts := downscaleBucketSide(current.Negative().Offset(), current.Negative().BucketCounts().AsRaw(), current.Scale(), targetScale)
545+
546+
// Filter buckets that fall below the new zero threshold
547+
additionalZeroCount := uint64(0)
548+
549+
// Filter positive buckets from previous histogram
550+
if newZeroThreshold > prev.ZeroThreshold() {
551+
var filteredZeroCount uint64
552+
pPosOff, pPosCounts, filteredZeroCount = filterBucketsForZeroThreshold(pPosOff, pPosCounts, targetScale, newZeroThreshold)
553+
additionalZeroCount += filteredZeroCount
554+
}
555+
556+
// Filter positive buckets from current histogram
557+
if newZeroThreshold > current.ZeroThreshold() {
558+
var filteredZeroCount uint64
559+
cPosOff, cPosCounts, filteredZeroCount = filterBucketsForZeroThreshold(cPosOff, cPosCounts, targetScale, newZeroThreshold)
560+
additionalZeroCount += filteredZeroCount
561+
}
562+
563+
// Merge the remaining buckets
564+
mPosOff, mPosCounts := mergeBuckets(pPosOff, pPosCounts, cPosOff, cPosCounts)
565+
mNegOff, mNegCounts := mergeBuckets(pNegOff, pNegCounts, cNegOff, cNegCounts)
566+
567+
dest.Positive().SetOffset(mPosOff)
568+
dest.Positive().BucketCounts().FromRaw(mPosCounts)
569+
dest.Negative().SetOffset(mNegOff)
570+
dest.Negative().BucketCounts().FromRaw(mNegCounts)
571+
572+
// Set zero count including additional counts from filtered buckets
573+
dest.SetZeroCount(prev.ZeroCount() + current.ZeroCount() + additionalZeroCount)
574+
dest.SetCount(prev.Count() + current.Count())
575+
576+
if prev.HasSum() && current.HasSum() {
577+
dest.SetSum(prev.Sum() + current.Sum())
578+
}
579+
580+
switch {
581+
case prev.HasMin() && current.HasMin():
582+
dest.SetMin(min(prev.Min(), current.Min()))
583+
case prev.HasMin():
584+
dest.SetMin(prev.Min())
585+
case current.HasMin():
586+
dest.SetMin(current.Min())
587+
}
588+
589+
switch {
590+
case prev.HasMax() && current.HasMax():
591+
dest.SetMax(max(prev.Max(), current.Max()))
592+
case prev.HasMax():
593+
dest.SetMax(prev.Max())
594+
case current.HasMax():
595+
dest.SetMax(current.Max())
596+
}
597+
}
598+
599+
func downscaleBucketSide(offset int32, counts []uint64, fromScale, targetScale int32) (int32, []uint64) {
600+
if len(counts) == 0 || fromScale <= targetScale {
601+
return offset, counts
602+
}
603+
shift := fromScale - targetScale
604+
factor := int32(1) << shift
605+
606+
first := offset
607+
last := offset + int32(len(counts)) - 1
608+
newOffset := floorDivInt32(first, factor)
609+
newLast := floorDivInt32(last, factor)
610+
newLen := int(newLast - newOffset + 1)
611+
for i := range counts {
612+
k := offset + int32(i)
613+
nk := floorDivInt32(k, factor)
614+
if k%factor == 0 {
615+
counts[nk-newOffset] = counts[i]
616+
} else {
617+
counts[nk-newOffset] += counts[i]
618+
}
619+
}
620+
return newOffset, counts[:newLen]
621+
}
622+
623+
func mergeBuckets(offsetA int32, countsA []uint64, offsetB int32, countsB []uint64) (int32, []uint64) {
624+
if len(countsA) == 0 && len(countsB) == 0 {
625+
return 0, nil
626+
}
627+
if len(countsA) == 0 {
628+
return offsetB, countsB
629+
}
630+
if len(countsB) == 0 {
631+
return offsetA, countsA
632+
}
633+
minOffset := min(offsetB, offsetA)
634+
lastA := offsetA + int32(len(countsA)) - 1
635+
lastB := offsetB + int32(len(countsB)) - 1
636+
maxLast := max(lastB, lastA)
637+
newBucketLen := int(maxLast - minOffset + 1)
638+
newBucketCount := make([]uint64, newBucketLen)
639+
for i := range countsA {
640+
k := offsetA + int32(i)
641+
newBucketCount[k-minOffset] += countsA[i]
642+
}
643+
for i := range countsB {
644+
k := offsetB + int32(i)
645+
newBucketCount[k-minOffset] += countsB[i]
646+
}
647+
return minOffset, newBucketCount
648+
}
649+
650+
func floorDivInt32(a, b int32) int32 {
651+
if b <= 0 {
652+
return 0
653+
}
654+
if a >= 0 {
655+
return a / b
656+
}
657+
return -(((-a) + b - 1) / b)
658+
}

0 commit comments

Comments
 (0)