|
34 | 34 | import java.util.HashMap; |
35 | 35 | import java.util.List; |
36 | 36 | import java.util.Map; |
| 37 | +import java.util.function.BiConsumer; |
37 | 38 | import java.util.logging.Level; |
38 | 39 | import java.util.logging.Logger; |
39 | 40 |
|
@@ -79,6 +80,20 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem |
79 | 80 | private long startEpochNanos; |
80 | 81 | private long epochNanos; |
81 | 82 |
|
| 83 | + // Delete the first empty handle to reclaim space, and return quickly for all subsequent entries |
| 84 | + private final BiConsumer<Attributes, AggregatorHandle<T, U>> handlesDeleter = |
| 85 | + new BiConsumer<Attributes, AggregatorHandle<T, U>>() { |
| 86 | + private boolean active = true; |
| 87 | + |
| 88 | + @Override |
| 89 | + public void accept(Attributes attributes, AggregatorHandle<T, U> handle) { |
| 90 | + if (active && !handle.hasRecordedValues()) { |
| 91 | + aggregatorHandles.remove(attributes); |
| 92 | + active = false; |
| 93 | + } |
| 94 | + } |
| 95 | + }; |
| 96 | + |
82 | 97 | private AsynchronousMetricStorage( |
83 | 98 | RegisteredReader registeredReader, |
84 | 99 | MetricDescriptor metricDescriptor, |
@@ -160,14 +175,9 @@ private Attributes validateAndProcessAttributes(Attributes attributes) { |
160 | 175 | Context context = Context.current(); |
161 | 176 | attributes = attributesProcessor.process(attributes, context); |
162 | 177 |
|
163 | | - if (aggregatorHandles.size() >= maxCardinality) { |
164 | | - aggregatorHandles.forEach( |
165 | | - (attr, handle) -> { |
166 | | - if (!handle.hasRecordedValues()) { |
167 | | - aggregatorHandles.remove(attr); |
168 | | - } |
169 | | - }); |
170 | | - if (aggregatorHandles.size() >= maxCardinality) { |
| 178 | + if (aggregatorHandles.size() == maxCardinality) { |
| 179 | + aggregatorHandles.forEach(handlesDeleter); |
| 180 | + if (aggregatorHandles.size() == maxCardinality) { |
171 | 181 | throttlingLogger.log( |
172 | 182 | Level.WARNING, |
173 | 183 | "Instrument " |
|
0 commit comments