Skip to content

Commit b574b8b

Browse files
committed
Break out DoubleExemplarReservoir, LongExemplarReservoir, add ExemplarReservoirFactory
1 parent 6d00153 commit b574b8b

File tree

38 files changed

+640
-513
lines changed

38 files changed

+640
-513
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Comparing source compatibility of opentelemetry-common-1.55.0-SNAPSHOT.jar against opentelemetry-common-1.54.1.jar
1+
Comparing source compatibility of opentelemetry-common-1.55.0-SNAPSHOT.jar against opentelemetry-common-1.55.0.jar
22
No changes.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
Comparing source compatibility of opentelemetry-sdk-metrics-1.55.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.54.1.jar
1+
Comparing source compatibility of opentelemetry-sdk-metrics-1.55.0-SNAPSHOT.jar against opentelemetry-sdk-metrics-1.55.0.jar
22
No changes.

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramAggregationParam.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import static io.opentelemetry.sdk.common.export.MemoryMode.IMMUTABLE_DATA;
99

10-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
10+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
1111
import java.util.Collections;
1212

1313
/** The types of histogram aggregation to benchmark. */
@@ -17,19 +17,19 @@ public enum HistogramAggregationParam {
1717
new DoubleExplicitBucketHistogramAggregator(
1818
ExplicitBucketHistogramUtils.createBoundaryArray(
1919
ExplicitBucketHistogramUtils.DEFAULT_HISTOGRAM_BUCKET_BOUNDARIES),
20-
ExemplarReservoir::noSamples,
20+
ExemplarReservoirFactory.noSamples(),
2121
IMMUTABLE_DATA)),
2222
EXPLICIT_SINGLE_BUCKET(
2323
new DoubleExplicitBucketHistogramAggregator(
2424
ExplicitBucketHistogramUtils.createBoundaryArray(Collections.emptyList()),
25-
ExemplarReservoir::noSamples,
25+
ExemplarReservoirFactory.noSamples(),
2626
IMMUTABLE_DATA)),
2727
EXPONENTIAL_SMALL_CIRCULAR_BUFFER(
2828
new DoubleBase2ExponentialHistogramAggregator(
29-
ExemplarReservoir::noSamples, 20, 0, IMMUTABLE_DATA)),
29+
ExemplarReservoirFactory.noSamples(), 20, 0, IMMUTABLE_DATA)),
3030
EXPONENTIAL_CIRCULAR_BUFFER(
3131
new DoubleBase2ExponentialHistogramAggregator(
32-
ExemplarReservoir::noSamples, 160, 0, IMMUTABLE_DATA));
32+
ExemplarReservoirFactory.noSamples(), 160, 0, IMMUTABLE_DATA));
3333

3434
private final Aggregator<?> aggregator;
3535

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramBenchmark.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

8+
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.context.Context;
810
import java.util.concurrent.TimeUnit;
911
import java.util.function.DoubleSupplier;
1012
import org.openjdk.jmh.annotations.Benchmark;
@@ -45,7 +47,8 @@ public final void setup() {
4547
public void record() {
4648
// Record a number of samples.
4749
for (int i = 0; i < 2000; i++) {
48-
this.aggregatorHandle.recordDouble(valueSupplier.getAsDouble());
50+
this.aggregatorHandle.recordDouble(
51+
valueSupplier.getAsDouble(), Attributes.empty(), Context.current());
4952
}
5053
}
5154
}

sdk/metrics/src/jmh/java/io/opentelemetry/sdk/metrics/internal/aggregator/HistogramScaleBenchmark.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

8+
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.context.Context;
810
import java.util.concurrent.TimeUnit;
911
import java.util.function.DoubleSupplier;
1012
import org.openjdk.jmh.annotations.Benchmark;
@@ -51,7 +53,8 @@ public final void setup() {
5153
public void record() {
5254
// Record a number of samples.
5355
for (int i = 0; i < 20000; i++) {
54-
this.aggregatorHandle.recordDouble(valueSupplier.getAsDouble());
56+
this.aggregatorHandle.recordDouble(
57+
valueSupplier.getAsDouble(), Attributes.empty(), Context.current());
5558
}
5659
}
5760
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/AggregatorHandle.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,11 @@
1010
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
1111
import io.opentelemetry.sdk.metrics.data.LongExemplarData;
1212
import io.opentelemetry.sdk.metrics.data.PointData;
13-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
13+
import io.opentelemetry.sdk.metrics.internal.exemplar.DoubleExemplarReservoir;
14+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
15+
import io.opentelemetry.sdk.metrics.internal.exemplar.LongExemplarReservoir;
1416
import java.util.List;
17+
import javax.annotation.Nullable;
1518
import javax.annotation.concurrent.ThreadSafe;
1619

1720
/**
@@ -27,14 +30,26 @@
2730
@ThreadSafe
2831
public abstract class AggregatorHandle<T extends PointData> {
2932

33+
private static final String UNSUPPORTED_LONG_MESSAGE =
34+
"This aggregator does not support long values.";
35+
private static final String UNSUPPORTED_DOUBLE_MESSAGE =
36+
"This aggregator does not support double values.";
37+
3038
// A reservoir of sampled exemplars for this time period.
31-
private final ExemplarReservoir exemplarReservoir;
32-
private volatile boolean valuesRecorded = false;
39+
@Nullable private final DoubleExemplarReservoir doubleReservoirFactory;
40+
@Nullable private final LongExemplarReservoir longReservoirFactory;
3341
private final boolean isDoubleType;
42+
private volatile boolean valuesRecorded = false;
3443

35-
protected AggregatorHandle(ExemplarReservoir exemplarReservoir) {
36-
this.exemplarReservoir = exemplarReservoir;
44+
protected AggregatorHandle(ExemplarReservoirFactory reservoirFactory) {
3745
this.isDoubleType = isDoubleType();
46+
if (isDoubleType) {
47+
this.doubleReservoirFactory = reservoirFactory.createDoubleExemplarReservoir();
48+
this.longReservoirFactory = null;
49+
} else {
50+
this.doubleReservoirFactory = null;
51+
this.longReservoirFactory = reservoirFactory.createLongExemplarReservoir();
52+
}
3853
}
3954

4055
/**
@@ -52,14 +67,16 @@ public final T aggregateThenMaybeReset(
5267
startEpochNanos,
5368
epochNanos,
5469
attributes,
55-
exemplarReservoir.collectAndResetDoubles(attributes),
70+
throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
71+
.collectAndResetDoubles(attributes),
5672
reset);
5773
}
5874
return doAggregateThenMaybeResetLongs(
5975
startEpochNanos,
6076
epochNanos,
6177
attributes,
62-
exemplarReservoir.collectAndResetLongs(attributes),
78+
throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
79+
.collectAndResetLongs(attributes),
6380
reset);
6481
}
6582

@@ -83,7 +100,7 @@ protected T doAggregateThenMaybeResetDoubles(
83100
Attributes attributes,
84101
List<DoubleExemplarData> exemplars,
85102
boolean reset) {
86-
throw new UnsupportedOperationException("This aggregator does not support double values.");
103+
throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
87104
}
88105

89106
/** Implementation of the {@link #aggregateThenMaybeReset(long, long, Attributes, boolean)} . */
@@ -93,22 +110,12 @@ protected T doAggregateThenMaybeResetLongs(
93110
Attributes attributes,
94111
List<LongExemplarData> exemplars,
95112
boolean reset) {
96-
throw new UnsupportedOperationException("This aggregator does not support long values.");
113+
throw new UnsupportedOperationException(UNSUPPORTED_LONG_MESSAGE);
97114
}
98115

99-
public final void recordLong(long value, Attributes attributes, Context context) {
100-
exemplarReservoir.offerLongMeasurement(value, attributes, context);
101-
recordLong(value);
102-
}
103-
104-
/**
105-
* Updates the current aggregator with a newly recorded {@code long} value.
106-
*
107-
* <p>Visible for Testing
108-
*
109-
* @param value the new {@code long} value to be added.
110-
*/
111-
public final void recordLong(long value) {
116+
public void recordLong(long value, Attributes attributes, Context context) {
117+
throwUnsupportedIfNull(this.longReservoirFactory, UNSUPPORTED_LONG_MESSAGE)
118+
.offerLongMeasurement(value, attributes, context);
112119
doRecordLong(value);
113120
valuesRecorded = true;
114121
}
@@ -122,18 +129,8 @@ protected void doRecordLong(long value) {
122129
}
123130

124131
public final void recordDouble(double value, Attributes attributes, Context context) {
125-
exemplarReservoir.offerDoubleMeasurement(value, attributes, context);
126-
recordDouble(value);
127-
}
128-
129-
/**
130-
* Updates the current aggregator with a newly recorded {@code double} value.
131-
*
132-
* <p>Visible for Testing
133-
*
134-
* @param value the new {@code double} value to be added.
135-
*/
136-
public final void recordDouble(double value) {
132+
throwUnsupportedIfNull(this.doubleReservoirFactory, UNSUPPORTED_DOUBLE_MESSAGE)
133+
.offerDoubleMeasurement(value, attributes, context);
137134
doRecordDouble(value);
138135
valuesRecorded = true;
139136
}
@@ -143,7 +140,7 @@ public final void recordDouble(double value) {
143140
* double values.
144141
*/
145142
protected void doRecordDouble(double value) {
146-
throw new UnsupportedOperationException("This aggregator does not support double values.");
143+
throw new UnsupportedOperationException(UNSUPPORTED_DOUBLE_MESSAGE);
147144
}
148145

149146
/**
@@ -154,4 +151,11 @@ protected void doRecordDouble(double value) {
154151
public boolean hasRecordedValues() {
155152
return valuesRecorded;
156153
}
154+
155+
private static <S> S throwUnsupportedIfNull(@Nullable S value, String message) {
156+
if (value == null) {
157+
throw new UnsupportedOperationException(message);
158+
}
159+
return value;
160+
}
157161
}

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleBase2ExponentialHistogramAggregator.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.sdk.metrics.internal.aggregator;
77

88
import io.opentelemetry.api.common.Attributes;
9+
import io.opentelemetry.context.Context;
910
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1011
import io.opentelemetry.sdk.common.export.MemoryMode;
1112
import io.opentelemetry.sdk.internal.DynamicPrimitiveLongList;
@@ -21,11 +22,10 @@
2122
import io.opentelemetry.sdk.metrics.internal.data.MutableExponentialHistogramBuckets;
2223
import io.opentelemetry.sdk.metrics.internal.data.MutableExponentialHistogramPointData;
2324
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
24-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
25+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
2526
import io.opentelemetry.sdk.resources.Resource;
2627
import java.util.Collection;
2728
import java.util.List;
28-
import java.util.function.Supplier;
2929
import javax.annotation.Nullable;
3030

3131
/**
@@ -37,30 +37,30 @@
3737
public final class DoubleBase2ExponentialHistogramAggregator
3838
implements Aggregator<ExponentialHistogramPointData> {
3939

40-
private final Supplier<ExemplarReservoir> reservoirSupplier;
40+
private final ExemplarReservoirFactory reservoirFactory;
4141
private final int maxBuckets;
4242
private final int maxScale;
4343
private final MemoryMode memoryMode;
4444

4545
/**
4646
* Constructs an exponential histogram aggregator.
4747
*
48-
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
48+
* @param reservoirFactory Supplier of exemplar reservoirs per-stream.
4949
*/
5050
public DoubleBase2ExponentialHistogramAggregator(
51-
Supplier<ExemplarReservoir> reservoirSupplier,
51+
ExemplarReservoirFactory reservoirFactory,
5252
int maxBuckets,
5353
int maxScale,
5454
MemoryMode memoryMode) {
55-
this.reservoirSupplier = reservoirSupplier;
55+
this.reservoirFactory = reservoirFactory;
5656
this.maxBuckets = maxBuckets;
5757
this.maxScale = maxScale;
5858
this.memoryMode = memoryMode;
5959
}
6060

6161
@Override
6262
public AggregatorHandle<ExponentialHistogramPointData> createHandle() {
63-
return new Handle(reservoirSupplier.get(), maxBuckets, maxScale, memoryMode);
63+
return new Handle(reservoirFactory, maxBuckets, maxScale, memoryMode);
6464
}
6565

6666
@Override
@@ -95,8 +95,12 @@ static final class Handle extends AggregatorHandle<ExponentialHistogramPointData
9595
// Used only when MemoryMode = REUSABLE_DATA
9696
@Nullable private final MutableExponentialHistogramPointData reusablePoint;
9797

98-
Handle(ExemplarReservoir reservoir, int maxBuckets, int maxScale, MemoryMode memoryMode) {
99-
super(reservoir);
98+
Handle(
99+
ExemplarReservoirFactory reservoirFactory,
100+
int maxBuckets,
101+
int maxScale,
102+
MemoryMode memoryMode) {
103+
super(reservoirFactory);
100104
this.maxBuckets = maxBuckets;
101105
this.maxScale = maxScale;
102106
this.sum = 0;
@@ -262,8 +266,8 @@ protected boolean isDoubleType() {
262266
}
263267

264268
@Override
265-
protected void doRecordLong(long value) {
266-
doRecordDouble((double) value);
269+
public void recordLong(long value, Attributes attributes, Context context) {
270+
super.recordDouble((double) value, attributes, context);
267271
}
268272

269273
void downScale(int by) {

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/aggregator/DoubleExplicitBucketHistogramAggregator.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import io.opentelemetry.api.common.Attributes;
99
import io.opentelemetry.api.internal.GuardedBy;
10+
import io.opentelemetry.context.Context;
1011
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
1112
import io.opentelemetry.sdk.common.export.MemoryMode;
1213
import io.opentelemetry.sdk.internal.PrimitiveLongList;
@@ -19,14 +20,13 @@
1920
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
2021
import io.opentelemetry.sdk.metrics.internal.data.MutableHistogramPointData;
2122
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
22-
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
23+
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoirFactory;
2324
import io.opentelemetry.sdk.resources.Resource;
2425
import java.util.ArrayList;
2526
import java.util.Arrays;
2627
import java.util.Collection;
2728
import java.util.Collections;
2829
import java.util.List;
29-
import java.util.function.Supplier;
3030
import javax.annotation.Nullable;
3131

3232
/**
@@ -43,17 +43,17 @@ public final class DoubleExplicitBucketHistogramAggregator
4343
// a cache for converting to MetricData
4444
private final List<Double> boundaryList;
4545

46-
private final Supplier<ExemplarReservoir> reservoirSupplier;
46+
private final ExemplarReservoirFactory reservoirFactory;
4747

4848
/**
4949
* Constructs an explicit bucket histogram aggregator.
5050
*
5151
* @param boundaries Bucket boundaries, in-order.
52-
* @param reservoirSupplier Supplier of exemplar reservoirs per-stream.
52+
* @param reservoirFactory Supplier of exemplar reservoirs per-stream.
5353
* @param memoryMode The {@link MemoryMode} to use in this aggregator.
5454
*/
5555
public DoubleExplicitBucketHistogramAggregator(
56-
double[] boundaries, Supplier<ExemplarReservoir> reservoirSupplier, MemoryMode memoryMode) {
56+
double[] boundaries, ExemplarReservoirFactory reservoirFactory, MemoryMode memoryMode) {
5757
this.boundaries = boundaries;
5858
this.memoryMode = memoryMode;
5959

@@ -62,12 +62,12 @@ public DoubleExplicitBucketHistogramAggregator(
6262
boundaryList.add(v);
6363
}
6464
this.boundaryList = Collections.unmodifiableList(boundaryList);
65-
this.reservoirSupplier = reservoirSupplier;
65+
this.reservoirFactory = reservoirFactory;
6666
}
6767

6868
@Override
6969
public AggregatorHandle<HistogramPointData> createHandle() {
70-
return new Handle(this.boundaryList, this.boundaries, reservoirSupplier.get(), memoryMode);
70+
return new Handle(boundaryList, boundaries, reservoirFactory, memoryMode);
7171
}
7272

7373
@Override
@@ -115,9 +115,9 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
115115
Handle(
116116
List<Double> boundaryList,
117117
double[] boundaries,
118-
ExemplarReservoir reservoir,
118+
ExemplarReservoirFactory reservoirFactory,
119119
MemoryMode memoryMode) {
120-
super(reservoir);
120+
super(reservoirFactory);
121121
this.boundaryList = boundaryList;
122122
this.boundaries = boundaries;
123123
this.counts = new long[this.boundaries.length + 1];
@@ -130,6 +130,11 @@ static final class Handle extends AggregatorHandle<HistogramPointData> {
130130
}
131131
}
132132

133+
@Override
134+
public void recordLong(long value, Attributes attributes, Context context) {
135+
super.recordDouble((double) value, attributes, context);
136+
}
137+
133138
@Override
134139
protected boolean isDoubleType() {
135140
return true;
@@ -196,10 +201,5 @@ protected void doRecordDouble(double value) {
196201
this.counts[bucketIndex]++;
197202
}
198203
}
199-
200-
@Override
201-
protected void doRecordLong(long value) {
202-
doRecordDouble((double) value);
203-
}
204204
}
205205
}

0 commit comments

Comments
 (0)