@@ -53,6 +53,7 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
5353 private final AggregationTemporality aggregationTemporality ;
5454 private final Aggregator <T , U > aggregator ;
5555 private final AttributesProcessor attributesProcessor ;
56+ private final MemoryMode memoryMode ;
5657
5758 /**
5859 * This field is set to 1 less than the actual intended cardinality limit, allowing the last slot
@@ -63,14 +64,14 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
6364 // Only populated if aggregationTemporality == DELTA
6465 private Map <Attributes , T > lastPoints ;
6566
67+ private Map <Attributes , AggregatorHandle <T , U >> aggregatorHandles = new HashMap <>();
68+
6669 // Only populated if memoryMode == REUSABLE_DATA
6770 private final ObjectPool <T > reusablePointsPool ;
6871 private final List <T > reusableDeltaPoints = new ArrayList <>();
6972
70- private Map <Attributes , AggregatorHandle <T , U >> aggregatorHandles = new HashMap <>();
71- private Map <Attributes , Measurement > latestRecordedMeasurements = new HashMap <>();
72-
73- private final MemoryMode memoryMode ;
73+ private long startEpochNanos ;
74+ private long epochNanos ;
7475
7576 private AsynchronousMetricStorage (
7677 RegisteredReader registeredReader ,
@@ -91,13 +92,11 @@ private AsynchronousMetricStorage(
9192 this .reusablePointsPool = new ObjectPool <>(aggregator ::createReusablePoint );
9293
9394 if (memoryMode == REUSABLE_DATA ) {
94- lastPoints = new PooledHashMap <>();
95- aggregatorHandles = new PooledHashMap <>();
96- latestRecordedMeasurements = new PooledHashMap <>();
95+ this .lastPoints = new PooledHashMap <>();
96+ this .aggregatorHandles = new PooledHashMap <>();
9797 } else {
98- lastPoints = new HashMap <>();
99- aggregatorHandles = new HashMap <>();
100- latestRecordedMeasurements = new HashMap <>();
98+ this .lastPoints = new HashMap <>();
99+ this .aggregatorHandles = new HashMap <>();
101100 }
102101 }
103102
@@ -127,24 +126,34 @@ AsynchronousMetricStorage<T, U> create(
127126 registeredView .getCardinalityLimit ());
128127 }
129128
130- /**
131- * Record callback measurement from {@link ObservableLongMeasurement} or {@link
132- * ObservableDoubleMeasurement}.
133- */
134- void record (Measurement measurement ) {
135- Context context = Context .current ();
136- Attributes processedAttributes = attributesProcessor .process (measurement .attributes (), context );
137- long start =
138- aggregationTemporality == AggregationTemporality .DELTA
139- ? registeredReader .getLastCollectEpochNanos ()
140- : measurement .startEpochNanos ();
129+ /** Record callback measurement from {@link ObservableLongMeasurement}. */
130+ void record (Attributes attributes , long value ) {
131+ attributes = validateAndProcessAttributes (attributes );
132+ AggregatorHandle <T , U > handle =
133+ aggregatorHandles .computeIfAbsent (attributes , key -> aggregator .createHandle ());
134+ handle .recordLong (value , attributes , Context .current ());
135+ }
141136
142- measurement = measurement .withAttributes (processedAttributes ).withStartEpochNanos (start );
137+ /** Record callback measurement from {@link ObservableDoubleMeasurement}. */
138+ void record (Attributes attributes , double value ) {
139+ attributes = validateAndProcessAttributes (attributes );
140+ AggregatorHandle <T , U > handle =
141+ aggregatorHandles .computeIfAbsent (attributes , key -> aggregator .createHandle ());
142+ handle .recordDouble (value , attributes , Context .current ());
143+ }
143144
144- recordPoint (processedAttributes , measurement );
145+ void setEpochInformation (long startEpochNanos , long epochNanos ) {
146+ this .startEpochNanos =
147+ aggregationTemporality == AggregationTemporality .DELTA
148+ ? registeredReader .getLastCollectEpochNanos ()
149+ : startEpochNanos ;
150+ this .epochNanos = epochNanos ;
145151 }
146152
147- private void recordPoint (Attributes attributes , Measurement measurement ) {
153+ private Attributes validateAndProcessAttributes (Attributes attributes ) {
154+ Context context = Context .current ();
155+ attributes = attributesProcessor .process (attributes , context );
156+
148157 if (aggregatorHandles .size () >= maxCardinality ) {
149158 throttlingLogger .log (
150159 Level .WARNING ,
@@ -153,9 +162,10 @@ private void recordPoint(Attributes attributes, Measurement measurement) {
153162 + " has exceeded the maximum allowed cardinality ("
154163 + maxCardinality
155164 + ")." );
156- attributes = MetricStorage .CARDINALITY_OVERFLOW ;
157- measurement = measurement .withAttributes (attributes );
158- } else if (aggregatorHandles .containsKey (
165+ return MetricStorage .CARDINALITY_OVERFLOW ;
166+ }
167+
168+ if (aggregatorHandles .containsKey (
159169 attributes )) { // Check there is not already a recording for the attributes
160170 throttlingLogger .log (
161171 Level .WARNING ,
@@ -165,31 +175,7 @@ private void recordPoint(Attributes attributes, Measurement measurement) {
165175 + attributes );
166176 }
167177
168- Measurement recentMeasurement = latestRecordedMeasurements .get (attributes );
169- if (recentMeasurement != null
170- && (recentMeasurement .startEpochNanos () != measurement .startEpochNanos ()
171- || recentMeasurement .epochNanos () != measurement .epochNanos ())) {
172- String msg =
173- String .format (
174- "Instrument "
175- + metricDescriptor .getSourceInstrument ().getName ()
176- + " has recorded multiple values for the same attributes (%s) with different metadata: %s/%s" ,
177- attributes ,
178- recentMeasurement ,
179- measurement );
180- throttlingLogger .log (Level .WARNING , msg );
181- }
182- latestRecordedMeasurements .put (attributes , measurement );
183-
184- AggregatorHandle <T , U > handle =
185- aggregatorHandles .computeIfAbsent (attributes , key -> aggregator .createHandle ());
186- if (measurement .hasDoubleValue ()) {
187- handle .recordDouble (measurement .doubleValue (), attributes , Context .current ());
188- } else if (measurement .hasLongValue ()) {
189- handle .recordLong (measurement .longValue (), attributes , Context .current ());
190- } else {
191- throw new IllegalStateException ();
192- }
178+ return attributes ;
193179 }
194180
195181 @ Override
@@ -220,31 +206,24 @@ public MetricData collect(
220206 Map <Attributes , T > currentPoints = new HashMap <>();
221207 aggregatorHandles .forEach (
222208 (attributes , handle ) -> {
223- Measurement latestRecordedMeasurement = latestRecordedMeasurements .get (attributes );
224- if (latestRecordedMeasurement == null ) {
225- throw new IllegalStateException ("Unexpected" );
226- }
227209 T value =
228210 handle .aggregateThenMaybeReset (
229- latestRecordedMeasurement . startEpochNanos () ,
230- latestRecordedMeasurement . epochNanos () ,
211+ AsynchronousMetricStorage . this . startEpochNanos ,
212+ AsynchronousMetricStorage . this . epochNanos ,
231213 attributes ,
232214 reset );
233215 currentPoints .put (attributes , value );
234216 });
235217
236218 if (memoryMode == REUSABLE_DATA ) {
237219 aggregatorHandles .clear ();
238- latestRecordedMeasurements .clear ();
239220 } else {
240221 aggregatorHandles = new HashMap <>();
241- latestRecordedMeasurements = new HashMap <>();
242222 }
243223
244224 Collection <T > result ;
245225 if (aggregationTemporality == AggregationTemporality .DELTA ) {
246- Collection <T > deltaPoints =
247- memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList <>();
226+ result = memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList <>();
248227 currentPoints .forEach (
249228 (attributes , currentPoint ) -> {
250229 T lastPoint = lastPoints .get (attributes );
@@ -265,15 +244,13 @@ public MetricData collect(
265244 }
266245 }
267246
268- deltaPoints .add (deltaPoint );
247+ result .add (deltaPoint );
269248 });
270249
271250 if (memoryMode == REUSABLE_DATA ) {
272251 lastPoints .forEach ((attributes , value ) -> reusablePointsPool .returnObject (value ));
273252 lastPoints .clear ();
274253 }
275-
276- result = deltaPoints ;
277254 } else {
278255 result = currentPoints .values ();
279256 }
0 commit comments