Skip to content

Commit 4e566d7

Browse files
committed
pool for handles
1 parent 7115187 commit 4e566d7

File tree

1 file changed

+11
-50
lines changed

1 file changed

+11
-50
lines changed

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/internal/state/AsynchronousMetricStorage.java

Lines changed: 11 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.HashMap;
3535
import java.util.List;
3636
import java.util.Map;
37-
import java.util.function.BiConsumer;
3837
import java.util.logging.Level;
3938
import java.util.logging.Logger;
4039

@@ -82,9 +81,6 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
8281
private long startEpochNanos;
8382
private long epochNanos;
8483

85-
// Delete the first empty handle to reclaim space, and return quickly for all subsequent entries
86-
private HandlesDeleter handlesDeleter;
87-
8884
private AsynchronousMetricStorage(
8985
RegisteredReader registeredReader,
9086
MetricDescriptor metricDescriptor,
@@ -103,7 +99,6 @@ private AsynchronousMetricStorage(
10399
this.maxCardinality = maxCardinality - 1;
104100
this.reusablePointsPool = new ObjectPool<>(aggregator::createReusablePoint);
105101
this.reusableHandlesPool = new ObjectPool<>(aggregator::createHandle);
106-
this.handlesDeleter = new HandlesDeleter();
107102

108103
if (memoryMode == REUSABLE_DATA) {
109104
this.lastPoints = new PooledHashMap<>();
@@ -169,23 +164,14 @@ private Attributes validateAndProcessAttributes(Attributes attributes) {
169164
attributes = attributesProcessor.process(attributes, context);
170165

171166
if (aggregatorHandles.size() >= maxCardinality) {
172-
aggregatorHandles.forEach(handlesDeleter);
173-
if (memoryMode == REUSABLE_DATA) {
174-
handlesDeleter.reset();
175-
} else {
176-
handlesDeleter = new HandlesDeleter();
177-
}
178-
179-
if (aggregatorHandles.size() >= maxCardinality) {
180-
throttlingLogger.log(
181-
Level.WARNING,
182-
"Instrument "
183-
+ metricDescriptor.getSourceInstrument().getName()
184-
+ " has exceeded the maximum allowed cardinality ("
185-
+ maxCardinality
186-
+ ").");
187-
return MetricStorage.CARDINALITY_OVERFLOW;
188-
}
167+
throttlingLogger.log(
168+
Level.WARNING,
169+
"Instrument "
170+
+ metricDescriptor.getSourceInstrument().getName()
171+
+ " has exceeded the maximum allowed cardinality ("
172+
+ maxCardinality
173+
+ ").");
174+
return MetricStorage.CARDINALITY_OVERFLOW;
189175
}
190176
return attributes;
191177
}
@@ -211,9 +197,9 @@ public MetricData collect(
211197
? collectWithDeltaAggregationTemporality()
212198
: collectWithCumulativeAggregationTemporality();
213199

214-
if (memoryMode != REUSABLE_DATA) {
215-
aggregatorHandles.clear();
216-
}
200+
// collectWith*AggregationTemporality() methods are responsible for resetting the handle
201+
aggregatorHandles.forEach((attribute, handle) -> reusableHandlesPool.returnObject(handle));
202+
aggregatorHandles.clear();
217203

218204
return aggregator.toMetricData(
219205
resource, instrumentationScopeInfo, metricDescriptor, result, aggregationTemporality);
@@ -233,10 +219,6 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
233219

234220
aggregatorHandles.forEach(
235221
(attributes, handle) -> {
236-
if (!handle.hasRecordedValues()) {
237-
return;
238-
}
239-
240222
T point =
241223
handle.aggregateThenMaybeReset(
242224
AsynchronousMetricStorage.this.startEpochNanos,
@@ -313,10 +295,6 @@ private Collection<T> collectWithCumulativeAggregationTemporality() {
313295

314296
aggregatorHandles.forEach(
315297
(attributes, handle) -> {
316-
if (!handle.hasRecordedValues()) {
317-
return;
318-
}
319-
320298
T value =
321299
handle.aggregateThenMaybeReset(
322300
AsynchronousMetricStorage.this.startEpochNanos,
@@ -332,21 +310,4 @@ private Collection<T> collectWithCumulativeAggregationTemporality() {
332310
public boolean isEmpty() {
333311
return aggregator == Aggregator.drop();
334312
}
335-
336-
private class HandlesDeleter implements BiConsumer<Attributes, AggregatorHandle<T, U>> {
337-
private boolean active = true;
338-
339-
@Override
340-
public void accept(Attributes attributes, AggregatorHandle<T, U> handle) {
341-
if (active && !handle.hasRecordedValues()) {
342-
AggregatorHandle<T, U> aggregatorHandle = aggregatorHandles.remove(attributes);
343-
reusableHandlesPool.returnObject(aggregatorHandle);
344-
active = false;
345-
}
346-
}
347-
348-
private void reset() {
349-
active = true;
350-
}
351-
}
352313
}

0 commit comments

Comments
 (0)