@@ -69,7 +69,10 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
6969
7070 // Only populated if memoryMode == REUSABLE_DATA
7171 private final ObjectPool <T > reusablePointsPool ;
72- private final List <T > reusableDeltaPoints = new ArrayList <>();
72+ private final List <T > reusablePointsList = new ArrayList <>();
73+ // If aggregationTemporality == DELTA, this reference and lastPoints will be swapped at every
74+ // collection
75+ private Map <Attributes , T > reusablePointsMap = new PooledHashMap <>();
7376
7477 // Time information relative to recording of data in aggregatorHandles, set while calling
7578 // callbacks
@@ -187,70 +190,91 @@ public MetricData collect(
187190 InstrumentationScopeInfo instrumentationScopeInfo ,
188191 long startEpochNanos ,
189192 long epochNanos ) {
190- boolean reset = aggregationTemporality == AggregationTemporality .DELTA ;
191-
192193 if (memoryMode == REUSABLE_DATA ) {
193194 // Collect can not run concurrently for same reader, hence we safely assume
194195 // the previous collect result has been used and done with
195- reusableDeltaPoints .forEach (reusablePointsPool ::returnObject );
196- reusableDeltaPoints .clear ();
196+ reusablePointsList .forEach (reusablePointsPool ::returnObject );
197+ reusablePointsList .clear ();
198+
199+ reusablePointsMap .forEach ((key , value ) -> reusablePointsPool .returnObject (value ));
200+ reusablePointsMap .clear ();
201+ }
202+
203+ Collection <T > result =
204+ aggregationTemporality == AggregationTemporality .DELTA
205+ ? collectWithDeltaAggregationTemporality ()
206+ : collectWithCumulativeAggregationTemporality ();
207+
208+ if (memoryMode == REUSABLE_DATA ) {
209+ aggregatorHandles .clear ();
210+ } else {
211+ aggregatorHandles = new HashMap <>();
197212 }
198213
199- Map <Attributes , T > currentPoints = new HashMap <>();
214+ return aggregator .toMetricData (
215+ resource , instrumentationScopeInfo , metricDescriptor , result , aggregationTemporality );
216+ }
217+
218+ private Collection <T > collectWithDeltaAggregationTemporality () {
219+ Map <Attributes , T > currentPoints =
220+ memoryMode == REUSABLE_DATA ? reusablePointsMap : new HashMap <>();
200221 aggregatorHandles .forEach (
201222 (attributes , handle ) -> {
202223 T value =
203224 handle .aggregateThenMaybeReset (
204225 AsynchronousMetricStorage .this .startEpochNanos ,
205226 AsynchronousMetricStorage .this .epochNanos ,
206227 attributes ,
207- reset );
228+ /* reset= */ true );
208229 currentPoints .put (attributes , value );
209230 });
210231
211- if ( memoryMode == REUSABLE_DATA ) {
212- aggregatorHandles . clear ();
213- } else {
214- aggregatorHandles = new HashMap <>( );
215- }
216-
217- Collection < T > result ;
218- if ( aggregationTemporality == AggregationTemporality . DELTA ) {
219- result = memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList <>();
220- currentPoints . forEach (
221- ( attributes , currentPoint ) -> {
222- T lastPoint = lastPoints . get ( attributes ) ;
223-
224- T deltaPoint ;
225- if ( lastPoint == null ) {
226- deltaPoint = currentPoint ;
232+ List < T > deltaPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList <>();
233+ currentPoints . forEach (
234+ ( attributes , currentPoint ) -> {
235+ T lastPoint = lastPoints . get ( attributes );
236+
237+ T deltaPoint ;
238+ if ( lastPoint == null ) {
239+ deltaPoint = currentPoint ;
240+ } else {
241+ if ( memoryMode == REUSABLE_DATA ) {
242+ aggregator . diffInPlace ( lastPoint , currentPoint );
243+ deltaPoint = lastPoint ;
244+
245+ // Remaining last points are returned to reusablePointsPool, but
246+ // this reusable point is still used, so don't return it to pool yet
247+ lastPoints . remove ( attributes ) ;
227248 } else {
228- if (memoryMode == REUSABLE_DATA ) {
229- aggregator .diffInPlace (lastPoint , currentPoint );
230- deltaPoint = lastPoint ;
231-
232- // Remaining last points are returned to reusablePointsPool, but
233- // this reusable point is still used, so don't return it to pool yet
234- lastPoints .remove (attributes );
235- } else {
236- deltaPoint = aggregator .diff (lastPoint , currentPoint );
237- }
249+ deltaPoint = aggregator .diff (lastPoint , currentPoint );
238250 }
251+ }
239252
240- result .add (deltaPoint );
241- });
253+ deltaPoints .add (deltaPoint );
254+ });
242255
243- if (memoryMode == REUSABLE_DATA ) {
244- lastPoints .forEach ((attributes , value ) -> reusablePointsPool .returnObject (value ));
245- lastPoints .clear ();
246- }
247- } else { /* CUMULATIVE */
248- result = currentPoints .values ();
256+ if (memoryMode == REUSABLE_DATA ) {
257+ Map <Attributes , T > tmp = lastPoints ;
258+ lastPoints = reusablePointsMap ;
259+ reusablePointsMap = tmp ;
249260 }
250- this .lastPoints = currentPoints ;
251261
252- return aggregator .toMetricData (
253- resource , instrumentationScopeInfo , metricDescriptor , result , aggregationTemporality );
262+ return deltaPoints ;
263+ }
264+
265+ private Collection <T > collectWithCumulativeAggregationTemporality () {
266+ List <T > currentPoints = memoryMode == REUSABLE_DATA ? reusablePointsList : new ArrayList <>();
267+ aggregatorHandles .forEach (
268+ (attributes , handle ) -> {
269+ T value =
270+ handle .aggregateThenMaybeReset (
271+ AsynchronousMetricStorage .this .startEpochNanos ,
272+ AsynchronousMetricStorage .this .epochNanos ,
273+ attributes ,
274+ /* reset= */ false );
275+ currentPoints .add (value );
276+ });
277+ return currentPoints ;
254278 }
255279
256280 @ Override
0 commit comments