Skip to content

Commit 47895af

Browse files
authored
[processor/cumulativetodelta] Add support for exponential histograms (#331)
* Remove unneeded min/max entries * Add support for exponential histograms * Update README
1 parent 45c88b5 commit 47895af

File tree

7 files changed

+779
-27
lines changed

7 files changed

+779
-27
lines changed

processor/cumulativetodeltaprocessor/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,16 @@
1515

1616
## Description
1717

18-
The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum and histogram metrics to monotonic, delta metrics. Non-monotonic sums and exponential histograms are excluded.
18+
The cumulative to delta processor (`cumulativetodeltaprocessor`) converts monotonic, cumulative sum, histogram, and exponential histogram metrics to monotonic, delta metrics. Non-monotonic sums are excluded.
1919

2020
## Configuration
2121

2222
Configuration is specified through a list of metrics. The processor uses metric names to identify a set of cumulative metrics and converts them from cumulative to delta.
2323

2424
The following settings can be optionally configured:
2525

26-
- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`.
27-
- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`.
26+
- `include`: List of metrics names (case-insensitive), patterns or metric types to convert to delta. Valid values are: `sum`, `histogram`, `exponentialhistogram`.
27+
- `exclude`: List of metrics names (case-insensitive), patterns or metric types to not convert to delta. **If a metric name matches both include and exclude, exclude takes precedence.** Valid values are: `sum`, `histogram`, `exponentialhistogram`.
2828
- `max_staleness`: The total time a state entry will live past the time it was last seen. Set to 0 to retain state indefinitely. Default: 0
2929
- `initial_value`: Handling of the first observed point for a given metric identity.
3030
When the collector (re)starts, there's no record of how much of a given cumulative counter has already been converted to delta values.

processor/cumulativetodeltaprocessor/internal/tracking/identity.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,5 @@ func (mi *MetricIdentity) IsFloatVal() bool {
7171
}
7272

7373
func (mi *MetricIdentity) IsSupportedMetricType() bool {
74-
return mi.MetricType == pmetric.MetricTypeSum || mi.MetricType == pmetric.MetricTypeHistogram
74+
return mi.MetricType == pmetric.MetricTypeSum || mi.MetricType == pmetric.MetricTypeHistogram || mi.MetricType == pmetric.MetricTypeExponentialHistogram
7575
}

processor/cumulativetodeltaprocessor/internal/tracking/identity_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -167,23 +167,23 @@ func TestMetricIdentity_IsSupportedMetricType(t *testing.T) {
167167
want: true,
168168
},
169169
{
170-
name: "none",
170+
name: "exponential_histogram",
171171
fields: fields{
172-
MetricType: pmetric.MetricTypeEmpty,
172+
MetricType: pmetric.MetricTypeExponentialHistogram,
173173
},
174-
want: false,
174+
want: true,
175175
},
176176
{
177-
name: "gauge",
177+
name: "none",
178178
fields: fields{
179-
MetricType: pmetric.MetricTypeGauge,
179+
MetricType: pmetric.MetricTypeEmpty,
180180
},
181181
want: false,
182182
},
183183
{
184-
name: "exponential_histogram",
184+
name: "gauge",
185185
fields: fields{
186-
MetricType: pmetric.MetricTypeExponentialHistogram,
186+
MetricType: pmetric.MetricTypeGauge,
187187
},
188188
want: false,
189189
},

processor/cumulativetodeltaprocessor/internal/tracking/tracker.go

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,11 @@ type State struct {
6565
}
6666

6767
type DeltaValue struct {
68-
StartTimestamp pcommon.Timestamp
69-
FloatValue float64
70-
IntValue int64
71-
HistogramValue *HistogramPoint
68+
StartTimestamp pcommon.Timestamp
69+
FloatValue float64
70+
IntValue int64
71+
HistogramValue *HistogramPoint
72+
ExpHistogramValue *ExpHistogramPoint
7273
}
7374

7475
func NewMetricTracker(ctx context.Context, logger *zap.Logger, maxStaleness time.Duration, initalValue InitialValue) *MetricTracker {
@@ -123,7 +124,10 @@ func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) {
123124
case pmetric.MetricTypeSum:
124125
out.IntValue = metricPoint.IntValue
125126
out.FloatValue = metricPoint.FloatValue
126-
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeSummary:
127+
case pmetric.MetricTypeExponentialHistogram:
128+
val := metricPoint.ExpHistogramValue.Clone()
129+
out.ExpHistogramValue = &val
130+
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
127131
}
128132
switch t.initialValue {
129133
case InitialValueAuto:
@@ -171,6 +175,33 @@ func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) {
171175
}
172176

173177
out.HistogramValue = &delta
178+
case pmetric.MetricTypeExponentialHistogram:
179+
value := metricPoint.ExpHistogramValue
180+
prevValue := state.PrevPoint.ExpHistogramValue
181+
if math.IsNaN(value.Sum) {
182+
value.Sum = prevValue.Sum
183+
}
184+
185+
if !value.isCompatible(prevValue) || value.isReset(prevValue) {
186+
valid = false
187+
}
188+
189+
delta := value.Clone()
190+
191+
if valid {
192+
delta.Count -= prevValue.Count
193+
delta.Sum -= prevValue.Sum
194+
delta.ZeroCount -= prevValue.ZeroCount
195+
196+
for index := 0; index < len(delta.PosBuckets) && index < len(prevValue.PosBuckets); index++ {
197+
delta.PosBuckets[index] -= prevValue.PosBuckets[index]
198+
}
199+
for index := 0; index < len(delta.NegBuckets) && index < len(prevValue.NegBuckets); index++ {
200+
delta.NegBuckets[index] -= prevValue.NegBuckets[index]
201+
}
202+
}
203+
204+
out.ExpHistogramValue = &delta
174205
case pmetric.MetricTypeSum:
175206
if metricID.IsFloatVal() {
176207
value := metricPoint.FloatValue
@@ -195,7 +226,7 @@ func (t *MetricTracker) Convert(in MetricPoint) (out DeltaValue, valid bool) {
195226

196227
out.IntValue = delta
197228
}
198-
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeSummary:
229+
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
199230
}
200231

201232
state.PrevPoint = metricPoint

processor/cumulativetodeltaprocessor/internal/tracking/value.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ type ValuePoint struct {
1010
FloatValue float64
1111
IntValue int64
1212
HistogramValue *HistogramPoint
13+
ExpHistogramValue *ExpHistogramPoint
1314
}
1415

1516
type HistogramPoint struct {
@@ -28,3 +29,51 @@ func (point *HistogramPoint) Clone() HistogramPoint {
2829
Buckets: bucketValues,
2930
}
3031
}
32+
33+
type ExpHistogramPoint struct {
34+
Scale int32
35+
NegativeOffsetIndex int32
36+
PositiveOffsetIndex int32
37+
ZeroThreshold float64
38+
Count uint64
39+
Sum float64
40+
ZeroCount uint64
41+
PosBuckets []uint64
42+
NegBuckets []uint64
43+
}
44+
45+
func (point *ExpHistogramPoint) Clone() ExpHistogramPoint {
46+
posBucketValues := make([]uint64, len(point.PosBuckets))
47+
copy(posBucketValues, point.PosBuckets)
48+
negBucketValues := make([]uint64, len(point.NegBuckets))
49+
copy(negBucketValues, point.NegBuckets)
50+
51+
return ExpHistogramPoint{
52+
Count: point.Count,
53+
Sum: point.Sum,
54+
Scale: point.Scale,
55+
ZeroCount: point.ZeroCount,
56+
ZeroThreshold: point.ZeroThreshold,
57+
PositiveOffsetIndex: point.PositiveOffsetIndex,
58+
NegativeOffsetIndex: point.NegativeOffsetIndex,
59+
PosBuckets: posBucketValues,
60+
NegBuckets: negBucketValues,
61+
}
62+
}
63+
64+
func (point *ExpHistogramPoint) isCompatible(other *ExpHistogramPoint) bool {
65+
// NOTE: this constraint could be relaxed but requires changes to tracker to handle perfect subsetting and handling
66+
// index offsetting
67+
return point.Scale == other.Scale &&
68+
point.PositiveOffsetIndex == other.PositiveOffsetIndex &&
69+
point.NegativeOffsetIndex == other.NegativeOffsetIndex &&
70+
point.ZeroThreshold == other.ZeroThreshold
71+
}
72+
73+
func (point *ExpHistogramPoint) isReset(prev *ExpHistogramPoint) bool {
74+
// A drop in count or decrease in number of buckets indicates a reset
75+
return point.Count < prev.Count ||
76+
point.ZeroCount < prev.ZeroCount ||
77+
len(point.PosBuckets) < len(prev.PosBuckets) ||
78+
len(point.NegBuckets) < len(prev.NegBuckets)
79+
}

processor/cumulativetodeltaprocessor/processor.go

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ func getMetricTypeFilter(types []string) (map[pmetric.MetricType]bool, error) {
6969
res[pmetric.MetricTypeSum] = true
7070
case strings.ToLower(pmetric.MetricTypeHistogram.String()):
7171
res[pmetric.MetricTypeHistogram] = true
72+
case strings.ToLower(pmetric.MetricTypeExponentialHistogram.String()):
73+
res[pmetric.MetricTypeExponentialHistogram] = true
7274
default:
7375
return nil, fmt.Errorf("unsupported metric type filter: %s", t)
7476
}
@@ -131,7 +133,31 @@ func (ctdp *cumulativeToDeltaProcessor) processMetrics(_ context.Context, md pme
131133

132134
ms.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
133135
return ms.DataPoints().Len() == 0
134-
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeExponentialHistogram, pmetric.MetricTypeSummary:
136+
case pmetric.MetricTypeExponentialHistogram:
137+
ms := m.ExponentialHistogram()
138+
if ms.AggregationTemporality() != pmetric.AggregationTemporalityCumulative {
139+
return false
140+
}
141+
142+
if ms.DataPoints().Len() == 0 {
143+
return false
144+
}
145+
146+
baseIdentity := tracking.MetricIdentity{
147+
Resource: rm.Resource(),
148+
InstrumentationLibrary: ilm.Scope(),
149+
MetricType: m.Type(),
150+
MetricName: m.Name(),
151+
MetricUnit: m.Unit(),
152+
MetricIsMonotonic: true,
153+
MetricValueType: pmetric.NumberDataPointValueTypeInt,
154+
}
155+
156+
ctdp.convertExponentialHistogramDataPoints(ms.DataPoints(), baseIdentity)
157+
158+
ms.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)
159+
return ms.DataPoints().Len() == 0
160+
case pmetric.MetricTypeEmpty, pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
135161
fallthrough
136162
default:
137163
return false
@@ -240,3 +266,55 @@ func (ctdp *cumulativeToDeltaProcessor) convertHistogramDataPoints(in any, baseI
240266
})
241267
}
242268
}
269+
270+
func (ctdp *cumulativeToDeltaProcessor) convertExponentialHistogramDataPoints(in any, baseIdentity tracking.MetricIdentity) {
271+
if dps, ok := in.(pmetric.ExponentialHistogramDataPointSlice); ok {
272+
dps.RemoveIf(func(dp pmetric.ExponentialHistogramDataPoint) bool {
273+
id := baseIdentity
274+
id.StartTimestamp = dp.StartTimestamp()
275+
id.Attributes = dp.Attributes()
276+
277+
if dp.Flags().NoRecordedValue() {
278+
// drop points with no value
279+
return true
280+
}
281+
282+
point := tracking.ValuePoint{
283+
ObservedTimestamp: dp.Timestamp(),
284+
ExpHistogramValue: &tracking.ExpHistogramPoint{
285+
Count: dp.Count(),
286+
Sum: dp.Sum(),
287+
Scale: dp.Scale(),
288+
ZeroCount: dp.ZeroCount(),
289+
ZeroThreshold: dp.ZeroThreshold(),
290+
PositiveOffsetIndex: dp.Positive().Offset(),
291+
NegativeOffsetIndex: dp.Negative().Offset(),
292+
PosBuckets: dp.Positive().BucketCounts().AsRaw(),
293+
NegBuckets: dp.Negative().BucketCounts().AsRaw(),
294+
},
295+
}
296+
297+
trackingPoint := tracking.MetricPoint{
298+
Identity: id,
299+
Value: point,
300+
}
301+
delta, valid := ctdp.deltaCalculator.Convert(trackingPoint)
302+
303+
if valid {
304+
dp.SetStartTimestamp(delta.StartTimestamp)
305+
dp.SetCount(delta.ExpHistogramValue.Count)
306+
if dp.HasSum() && !math.IsNaN(dp.Sum()) {
307+
dp.SetSum(delta.ExpHistogramValue.Sum)
308+
}
309+
dp.Positive().BucketCounts().FromRaw(delta.ExpHistogramValue.PosBuckets)
310+
dp.Negative().BucketCounts().FromRaw(delta.ExpHistogramValue.NegBuckets)
311+
dp.SetZeroCount(delta.ExpHistogramValue.ZeroCount)
312+
dp.RemoveMin()
313+
dp.RemoveMax()
314+
return false
315+
}
316+
317+
return !valid
318+
})
319+
}
320+
}

0 commit comments

Comments
 (0)