@@ -62,7 +62,7 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
6262 private final int maxCardinality ;
6363
6464 // Handles responsible for aggregating data recorded during callbacks
65- private Map <Attributes , AggregatorHandle <T , U >> aggregatorHandles ;
65+ private final Map <Attributes , AggregatorHandle <T , U >> aggregatorHandles ;
6666
6767 // Only populated if aggregationTemporality == DELTA
6868 private Map <Attributes , T > lastPoints ;
@@ -190,44 +190,54 @@ public MetricData collect(
190190 InstrumentationScopeInfo instrumentationScopeInfo ,
191191 long startEpochNanos ,
192192 long epochNanos ) {
193- if (memoryMode == REUSABLE_DATA ) {
194- // Collect can not run concurrently for same reader, hence we safely assume
195- // the previous collect result has been used and done with
196- reusablePointsList .forEach (reusablePointsPool ::returnObject );
197- reusablePointsList .clear ();
198-
199- reusablePointsMap .forEach ((key , value ) -> reusablePointsPool .returnObject (value ));
200- reusablePointsMap .clear ();
201- }
202-
203193 Collection <T > result =
204194 aggregationTemporality == AggregationTemporality .DELTA
205195 ? collectWithDeltaAggregationTemporality ()
206196 : collectWithCumulativeAggregationTemporality ();
207197
208- if (memoryMode = = REUSABLE_DATA ) {
198+ if (memoryMode ! = REUSABLE_DATA ) {
209199 aggregatorHandles .clear ();
210- } else {
211- aggregatorHandles = new HashMap <>();
212200 }
213201
214202 return aggregator .toMetricData (
215203 resource , instrumentationScopeInfo , metricDescriptor , result , aggregationTemporality );
216204 }
217205
218206 private Collection <T > collectWithDeltaAggregationTemporality () {
219- Map <Attributes , T > currentPoints =
220- memoryMode == REUSABLE_DATA ? reusablePointsMap : new HashMap <>();
207+ Map <Attributes , T > currentPoints ;
208+ if (memoryMode == REUSABLE_DATA ) {
209+ // deltaPoints computed in the previous collection can be released
210+ reusablePointsList .forEach (reusablePointsPool ::returnObject );
211+ reusablePointsList .clear ();
212+
213+ currentPoints = reusablePointsMap ;
214+ } else {
215+ currentPoints = new HashMap <>();
216+ }
217+
221218 aggregatorHandles .forEach (
222219 (attributes , handle ) -> {
223- T value =
220+ if (!handle .hasRecordedValues ()) {
221+ return ;
222+ }
223+
224+ T point =
224225 handle .aggregateThenMaybeReset (
225226 AsynchronousMetricStorage .this .startEpochNanos ,
226227 AsynchronousMetricStorage .this .epochNanos ,
227228 attributes ,
228- // No need to reset, aggregatorHandles is going to be cleared anyways
229- /* reset= */ false );
230- currentPoints .put (attributes , value );
229+ /* reset= */ true );
230+
231+ T pointForCurrentPoints ;
232+ if (memoryMode == REUSABLE_DATA ) {
233+ // AggregatorHandle is going to modify the point eventually, but we must persist its
234+ // value to used it at the next collection (within lastPoints). Thus, we make a copy.
235+ pointForCurrentPoints = reusablePointsPool .borrowObject ();
236+ aggregator .copyPoint (point , pointForCurrentPoints );
237+ } else {
238+ pointForCurrentPoints = point ;
239+ }
240+ currentPoints .put (attributes , pointForCurrentPoints );
231241 });
232242
233243 List <T > deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList <>();
@@ -237,24 +247,33 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
237247
238248 T deltaPoint ;
239249 if (lastPoint == null ) {
240- deltaPoint = currentPoint ;
250+ if (memoryMode == REUSABLE_DATA ) {
251+ // All deltaPoints are released at the end of the collection. Thus, we need a copy
252+ // to make sure currentPoint can still be used within lastPoints during the next
253+ // collection.
254+ deltaPoint = reusablePointsPool .borrowObject ();
255+ aggregator .copyPoint (currentPoint , deltaPoint );
256+ } else {
257+ deltaPoint = currentPoint ;
258+ }
241259 } else {
242260 if (memoryMode == REUSABLE_DATA ) {
243261 aggregator .diffInPlace (lastPoint , currentPoint );
244262 deltaPoint = lastPoint ;
245-
246- // Remaining last points are returned to reusablePointsPool, but
247- // this reusable point is still used, so don't return it to pool yet
248- lastPoints .remove (attributes );
249263 } else {
250264 deltaPoint = aggregator .diff (lastPoint , currentPoint );
251265 }
252266 }
253-
254267 deltaPoints .add (deltaPoint );
255268 });
256269
257270 if (memoryMode == REUSABLE_DATA ) {
271+ // lastPoints for the current collection can be discarded when the collection is completed.
272+ // They can be returned to the pool because they're not managed by the AggregatorHandle,
273+ // we made a copy.
274+ lastPoints .forEach ((attributes , point ) -> reusablePointsPool .returnObject (point ));
275+ lastPoints .clear ();
276+
258277 Map <Attributes , T > tmp = lastPoints ;
259278 lastPoints = reusablePointsMap ;
260279 reusablePointsMap = tmp ;
@@ -266,16 +285,28 @@ private Collection<T> collectWithDeltaAggregationTemporality() {
266285 }
267286
268287 private Collection <T > collectWithCumulativeAggregationTemporality () {
269- List <T > currentPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList <>();
288+ List <T > currentPoints ;
289+ if (memoryMode == REUSABLE_DATA ) {
290+ // We should not return the points in this list to the pool, they belong to the
291+ // AggregatorHandle
292+ reusablePointsList .clear ();
293+ currentPoints = reusablePointsList ;
294+ } else {
295+ currentPoints = new ArrayList <>();
296+ }
297+
270298 aggregatorHandles .forEach (
271299 (attributes , handle ) -> {
300+ if (!handle .hasRecordedValues ()) {
301+ return ;
302+ }
303+
272304 T value =
273305 handle .aggregateThenMaybeReset (
274306 AsynchronousMetricStorage .this .startEpochNanos ,
275307 AsynchronousMetricStorage .this .epochNanos ,
276308 attributes ,
277- // No need to reset, aggregatorHandles is going to be cleared anyways
278- /* reset= */ false );
309+ /* reset= */ true );
279310 currentPoints .add (value );
280311 });
281312 return currentPoints ;
0 commit comments