Skip to content

Commit a01288b

Browse files
authored
Type specific exemplar reservoirs (#7758)
1 parent 2753db8 commit a01288b

File tree

52 files changed

+983
-981
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+983
-981
lines changed

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
99

10-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
10+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
1111
import java.util.Collections;
1212

1313
/** The types of histogram aggregation to benchmark. */
@@ -17,27 +17,27 @@ public enum HistogramAggregationParam {
1717
new DoubleExplicitBucketHistogramAggregator(
1818
ExplicitBucketHistogramUtils.createBoundaryArray(
1919
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES),
20-
ExemplarReservoir::doubleNoSamples,
20+
ExemplarReservoirFactory.noSamples(),
2121
IMMUTABLE_DATA)),
2222
EXPLICIT_SINGLE_BUCKET(
2323
new DoubleExplicitBucketHistogramAggregator(
2424
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
25-
ExemplarReservoir::doubleNoSamples,
25+
ExemplarReservoirFactory.noSamples(),
2626
IMMUTABLE_DATA)),
2727
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
2828
new DoubleBase2ExponentialHistogramAggregator(
29-
ExemplarReservoir::doubleNoSamples, 20, 0, IMMUTABLE_DATA)),
29+
ExemplarReservoirFactory.noSamples(), 20, 0, IMMUTABLE_DATA)),
3030
EXPONENTIAL_CIRCULAR_BUFFER(
3131
new DoubleBase2ExponentialHistogramAggregator(
32-
ExemplarReservoir::doubleNoSamples, 160, 0, IMMUTABLE_DATA));
32+
ExemplarReservoirFactory.noSamples(), 160, 0, IMMUTABLE_DATA));
3333

34-
private final Aggregator<?, ?> aggregator;
34+
private final Aggregator<?> aggregator;
3535

36-
HistogramAggregationParam(Aggregator<?, ?> aggregator) {
36+
HistogramAggregationParam(Aggregator<?> aggregator) {
3737
this.aggregator = aggregator;
3838
}
3939

40-
public Aggregator<?, ?> getAggregator() {
40+
public Aggregator<?> getAggregator() {
4141
return this.aggregator;
4242
}
4343
}

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

8+
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.context.Context;
810
import java.util.concurrent.TimeUnit;
911
import java.util.function.DoubleSupplier;
1012
import org.openjdk.jmh.annotations.Benchmark;
@@ -33,7 +35,7 @@ public class HistogramBenchmark {
3335
public static class ThreadState {
3436
@Param HistogramValueGenerator valueGen;
3537
@Param HistogramAggregationParam aggregation;
36-
private AggregatorHandle<?, ?> aggregatorHandle;
38+
private AggregatorHandle<?> aggregatorHandle;
3739
private DoubleSupplier valueSupplier;
3840

3941
@Setup(Level.Trial)
@@ -45,7 +47,8 @@ public final void setup() {
4547
public void record() {
4648
// Record a number of samples.
4749
for (int i = 0; i < 2000; i++) {
48-
this.aggregatorHandle.recordDouble(valueSupplier.getAsDouble());
50+
this.aggregatorHandle.recordDouble(
51+
valueSupplier.getAsDouble(), Attributes.empty(), Context.current());
4952
}
5053
}
5154
}

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

8+
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.context.Context;
810
import java.util.concurrent.TimeUnit;
911
import java.util.function.DoubleSupplier;
1012
import org.openjdk.jmh.annotations.Benchmark;
@@ -39,7 +41,7 @@ public class HistogramScaleBenchmark {
3941
public static class ThreadState {
4042
@Param HistogramValueGenerator valueGen;
4143
@Param HistogramAggregationParam aggregation;
42-
private AggregatorHandle<?, ?> aggregatorHandle;
44+
private AggregatorHandle<?> aggregatorHandle;
4345
private DoubleSupplier valueSupplier;
4446

4547
@Setup(Level.Invocation)
@@ -51,7 +53,8 @@ public final void setup() {
5153
public void record() {
5254
// Record a number of samples.
5355
for (int i = 0; i < 20000; i++) {
54-
this.aggregatorHandle.recordDouble(valueSupplier.getAsDouble());
56+
this.aggregatorHandle.recordDouble(
57+
valueSupplier.getAsDouble(), Attributes.empty(), Context.current());
5558
}
5659
}
5760
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/SdkMeter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ WriteableMetricStorage registerSynchronousMetricStorage(InstrumentDescriptor ins
304304
/** Register new asynchronous storage associated with a given instrument. */
305305
SdkObservableMeasurement registerObservableMeasurement(
306306
InstrumentDescriptor instrumentDescriptor) {
307-
List<AsynchronousMetricStorage<?, ?>> registeredStorages = new ArrayList<>();
307+
List<AsynchronousMetricStorage<?>> registeredStorages = new ArrayList<>();
308308
for (Map.Entry<RegisteredReader, MetricStorageRegistry> entry :
309309
readerStorageRegistries.entrySet()) {
310310
RegisteredReader reader = entry.getKey();

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AbstractSumAggregator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
1212

1313
abstract class AbstractSumAggregator<T extends PointData, U extends ExemplarData>
14-
implements Aggregator<T, U> {
14+
implements Aggregator<T> {
1515
private final boolean isMonotonic;
1616

1717
AbstractSumAggregator(InstrumentDescriptor instrumentDescriptor) {

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/Aggregator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
99
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
10-
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
11-
import io.opentelemetry.sdk.metrics.data.ExemplarData;
1210
import io.opentelemetry.sdk.metrics.data.MetricData;
1311
import io.opentelemetry.sdk.metrics.data.MetricDataType;
1412
import io.opentelemetry.sdk.metrics.data.PointData;
@@ -25,9 +23,9 @@
2523
* at any time.
2624
*/
2725
@Immutable
28-
public interface Aggregator<T extends PointData, U extends ExemplarData> {
26+
public interface Aggregator<T extends PointData> {
2927
/** Returns the drop aggregator, an aggregator that drops measurements. */
30-
static Aggregator<?, DoubleExemplarData> drop() {
28+
static Aggregator<?> drop() {
3129
return DropAggregator.INSTANCE;
3230
}
3331

@@ -37,7 +35,7 @@ static Aggregator<?, DoubleExemplarData> drop() {
3735
*
3836
* @return a new {@link AggregatorHandle}.
3937
*/
40-
AggregatorHandle<T, U> createHandle();
38+
AggregatorHandle<T> createHandle();
4139

4240
/**
4341
* Returns a new DELTA point by computing the difference between two cumulative points.

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

88
import io.opentelemetry.sdk.common.export.MemoryMode;
9-
import io.opentelemetry.sdk.metrics.data.ExemplarData;
109
import io.opentelemetry.sdk.metrics.data.PointData;
1110
import io.opentelemetry.sdk.metrics.internal.descriptor.InstrumentDescriptor;
1211
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarFilter;
@@ -31,7 +30,7 @@ public interface AggregatorFactory {
3130
* @return a new {@link Aggregator}. {@link Aggregator#drop()} indicates no measurements should be
3231
* recorded.
3332
*/
34-
<T extends PointData, U extends ExemplarData> Aggregator<T, U> createAggregator(
33+
<T extends PointData> Aggregator<T> createAggregator(
3534
InstrumentDescriptor instrumentDescriptor,
3635
ExemplarFilter exemplarFilter,
3736
MemoryMode memoryMode);

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java

Lines changed: 76 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,14 @@
77

88
import io.opentelemetry.api.common.Attributes;
99
import io.opentelemetry.context.Context;
10-
import io.opentelemetry.sdk.metrics.data.ExemplarData;
10+
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
11+
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
1112
import io.opentelemetry.sdk.metrics.data.PointData;
12-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
13+
import io.opentelemetry.sdk.metrics.internal.exemplar.DoubleExemplarReservoir;
14+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
15+
import io.opentelemetry.sdk.metrics.internal.exemplar.LongExemplarReservoir;
1316
import java.util.List;
17+
import javax.annotation.Nullable;
1418
import javax.annotation.concurrent.ThreadSafe;
1519

1620
/**
@@ -24,14 +28,28 @@
2428
* at any time.
2529
*/
2630
@ThreadSafe
27-
public abstract class AggregatorHandle<T extends PointData, U extends ExemplarData> {
31+
public abstract class AggregatorHandle<T extends PointData> {
32+
33+
private static final String UNSUPPORTED_LONG_MESSAGE =
34+
"This aggregator does not support long values.";
35+
private static final String UNSUPPORTED_DOUBLE_MESSAGE =
36+
"This aggregator does not support double values.";
2837

2938
// A reservoir of sampled exemplars for this time period.
30-
private final ExemplarReservoir<U> exemplarReservoir;
39+
@Nullable private final DoubleExemplarReservoir doubleReservoirFactory;
40+
@Nullable private final LongExemplarReservoir longReservoirFactory;
41+
private final boolean isDoubleType;
3142
private volatile boolean valuesRecorded = false;
3243

33-
protected AggregatorHandle(ExemplarReservoir<U> exemplarReservoir) {
34-
this.exemplarReservoir = exemplarReservoir;
44+
protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory) {
45+
this.isDoubleType = isDoubleType();
46+
if (isDoubleType) {
47+
this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir();
48+
this.longReservoirFactory = null;
49+
} else {
50+
this.doubleReservoirFactory = null;
51+
this.longReservoirFactory = reservoirFactory.createLongExemplarReservoir();
52+
}
3553
}
3654

3755
/**
@@ -44,35 +62,60 @@ public final T aggregateThenMaybeReset(
4462
valuesRecorded = false;
4563
}
4664

47-
return doAggregateThenMaybeReset(
65+
if (isDoubleType) {
66+
return doAggregateThenMaybeResetDoubles(
67+
startEpochNanos,
68+
epochNanos,
69+
attributes,
70+
throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
71+
.collectAndResetDoubles(attributes),
72+
reset);
73+
}
74+
return doAggregateThenMaybeResetLongs(
4875
startEpochNanos,
4976
epochNanos,
5077
attributes,
51-
exemplarReservoir.collectAndReset(attributes),
78+
throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
79+
.collectAndResetLongs(attributes),
5280
reset);
5381
}
5482

83+
/**
84+
* Indicates whether this {@link AggregatorHandle} supports double or long values.
85+
*
86+
* <p>If it supports doubles, it MUST implement {@link #doAggregateThenMaybeResetDoubles(long,
87+
* long, Attributes, List, boolean)} and {@link #doRecordDouble(double)}.
88+
*
89+
* <p>If it supports long, it MUST implement {@link #doAggregateThenMaybeResetLongs(long, long,
90+
* Attributes, List, boolean)} and {@link #doRecordLong(long)}.
91+
*
92+
* @return true if it supports doubles, false if it supports longs.
93+
*/
94+
protected abstract boolean isDoubleType();
95+
5596
/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
56-
protected abstract T doAggregateThenMaybeReset(
97+
protected T doAggregateThenMaybeResetDoubles(
5798
long startEpochNanos,
5899
long epochNanos,
59100
Attributes attributes,
60-
List<U> exemplars,
61-
boolean reset);
101+
List<DoubleExemplarData> exemplars,
102+
boolean reset) {
103+
throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
104+
}
62105

63-
public final void recordLong(long value, Attributes attributes, Context context) {
64-
exemplarReservoir.offerLongMeasurement(value, attributes, context);
65-
recordLong(value);
106+
/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
107+
protected T doAggregateThenMaybeResetLongs(
108+
long startEpochNanos,
109+
long epochNanos,
110+
Attributes attributes,
111+
List<LongExemplarData> exemplars,
112+
boolean reset) {
113+
throw new UnsupportedOperationException(UNSUPPORTED_LONG_MESSAGE);
66114
}
67115

68-
/**
69-
* Updates the current aggregator with a newly recorded {@code long} value.
70-
*
71-
* <p>Visible for Testing
72-
*
73-
* @param value the new {@code long} value to be added.
74-
*/
75-
public final void recordLong(long value) {
116+
public void recordLong(long value, Attributes attributes, Context context) {
117+
throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
118+
.offerLongMeasurement(value, attributes, context);
76119
doRecordLong(value);
77120
valuesRecorded = true;
78121
}
@@ -82,23 +125,12 @@ public final void recordLong(long value) {
82125
* values.
83126
*/
84127
protected void doRecordLong(long value) {
85-
throw new UnsupportedOperationException(
86-
"This aggregator does not support recording long values.");
128+
throw new UnsupportedOperationException("This aggregator does not support long values.");
87129
}
88130

89131
public final void recordDouble(double value, Attributes attributes, Context context) {
90-
exemplarReservoir.offerDoubleMeasurement(value, attributes, context);
91-
recordDouble(value);
92-
}
93-
94-
/**
95-
* Updates the current aggregator with a newly recorded {@code double} value.
96-
*
97-
* <p>Visible for Testing
98-
*
99-
* @param value the new {@code double} value to be added.
100-
*/
101-
public final void recordDouble(double value) {
132+
throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
133+
.offerDoubleMeasurement(value, attributes, context);
102134
doRecordDouble(value);
103135
valuesRecorded = true;
104136
}
@@ -108,8 +140,7 @@ public final void recordDouble(double value) {
108140
* double values.
109141
*/
110142
protected void doRecordDouble(double value) {
111-
throw new UnsupportedOperationException(
112-
"This aggregator does not support recording double values.");
143+
throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
113144
}
114145

115146
/**
@@ -120,4 +151,11 @@ protected void doRecordDouble(double value) {
120151
public boolean hasRecordedValues() {
121152
return valuesRecorded;
122153
}
154+
155+
private static <S> S throwUnsupportedIfNull(@Nullable S value, String message) {
156+
if (value == null) {
157+
throw new UnsupportedOperationException(message);
158+
}
159+
return value;
160+
}
123161
}

0 commit comments

Comments
 (0)