2121import io .opentelemetry .sdk .metrics .data .PointData ;
2222import io .opentelemetry .sdk .metrics .internal .aggregator .Aggregator ;
2323import io .opentelemetry .sdk .metrics .internal .aggregator .AggregatorFactory ;
24+ import io .opentelemetry .sdk .metrics .internal .aggregator .AggregatorHandle ;
2425import io .opentelemetry .sdk .metrics .internal .descriptor .InstrumentDescriptor ;
2526import io .opentelemetry .sdk .metrics .internal .descriptor .MetricDescriptor ;
2627import io .opentelemetry .sdk .metrics .internal .exemplar .ExemplarFilter ;
3132import java .util .ArrayList ;
3233import java .util .Collection ;
3334import java .util .HashMap ;
35+ import java .util .List ;
3436import java .util .Map ;
3537import java .util .logging .Level ;
3638import java .util .logging .Logger ;
@@ -58,16 +60,15 @@ public final class AsynchronousMetricStorage<T extends PointData, U extends Exem
5860 */
5961 private final int maxCardinality ;
6062
61- private Map <Attributes , T > points ;
62-
6363 // Only populated if aggregationTemporality == DELTA
6464 private Map <Attributes , T > lastPoints ;
6565
6666 // Only populated if memoryMode == REUSABLE_DATA
6767 private final ObjectPool <T > reusablePointsPool ;
68+ private final List <T > reusableDeltaPoints = new ArrayList <>();
6869
69- // Only populated if memoryMode == REUSABLE_DATA
70- private final ArrayList < T > reusableResultList = new ArrayList <>();
70+ private Map < Attributes , AggregatorHandle < T , U >> aggregatorHandles = new HashMap <>();
71+ private Map < Attributes , Measurement > latestRecordedMeasurements = new HashMap <>();
7172
7273 private final MemoryMode memoryMode ;
7374
@@ -88,12 +89,15 @@ private AsynchronousMetricStorage(
8889 this .attributesProcessor = attributesProcessor ;
8990 this .maxCardinality = maxCardinality - 1 ;
9091 this .reusablePointsPool = new ObjectPool <>(aggregator ::createReusablePoint );
92+
9193 if (memoryMode == REUSABLE_DATA ) {
9294 lastPoints = new PooledHashMap <>();
93- points = new PooledHashMap <>();
95+ aggregatorHandles = new PooledHashMap <>();
96+ latestRecordedMeasurements = new PooledHashMap <>();
9497 } else {
9598 lastPoints = new HashMap <>();
96- points = new HashMap <>();
99+ aggregatorHandles = new HashMap <>();
100+ latestRecordedMeasurements = new HashMap <>();
97101 }
98102 }
99103
@@ -141,7 +145,7 @@ void record(Measurement measurement) {
141145 }
142146
143147 private void recordPoint (Attributes attributes , Measurement measurement ) {
144- if (points .size () >= maxCardinality ) {
148+ if (aggregatorHandles .size () >= maxCardinality ) {
145149 throttlingLogger .log (
146150 Level .WARNING ,
147151 "Instrument "
@@ -151,26 +155,41 @@ private void recordPoint(Attributes attributes, Measurement measurement) {
151155 + ")." );
152156 attributes = MetricStorage .CARDINALITY_OVERFLOW ;
153157 measurement = measurement .withAttributes (attributes );
154- } else if (points .containsKey (
158+ } else if (aggregatorHandles .containsKey (
155159 attributes )) { // Check there is not already a recording for the attributes
156160 throttlingLogger .log (
157161 Level .WARNING ,
158162 "Instrument "
159163 + metricDescriptor .getSourceInstrument ().getName ()
160164 + " has recorded multiple values for the same attributes: "
161165 + attributes );
162- return ;
163166 }
164167
165- T dataPoint ;
166- if (memoryMode == REUSABLE_DATA ) {
167- dataPoint = reusablePointsPool .borrowObject ();
168- aggregator .toPoint (measurement , dataPoint );
168+ Measurement recentMeasurement = latestRecordedMeasurements .get (attributes );
169+ if (recentMeasurement != null
170+ && (recentMeasurement .startEpochNanos () != measurement .startEpochNanos ()
171+ || recentMeasurement .epochNanos () != measurement .epochNanos ())) {
172+ String msg =
173+ String .format (
174+ "Instrument "
175+ + metricDescriptor .getSourceInstrument ().getName ()
176+ + " has recorded multiple values for the same attributes (%s) with different metadata: %s/%s" ,
177+ attributes ,
178+ recentMeasurement ,
179+ measurement );
180+ throttlingLogger .log (Level .WARNING , msg );
181+ }
182+ latestRecordedMeasurements .put (attributes , measurement );
183+
184+ AggregatorHandle <T , U > handle =
185+ aggregatorHandles .computeIfAbsent (attributes , key -> aggregator .createHandle ());
186+ if (measurement .hasDoubleValue ()) {
187+ handle .recordDouble (measurement .doubleValue (), attributes , Context .current ());
188+ } else if (measurement .hasLongValue ()) {
189+ handle .recordLong (measurement .longValue (), attributes , Context .current ());
169190 } else {
170- dataPoint = aggregator . toPoint ( measurement );
191+ throw new IllegalStateException ( );
171192 }
172-
173- points .put (attributes , dataPoint );
174193 }
175194
176195 @ Override
@@ -189,73 +208,76 @@ public MetricData collect(
189208 InstrumentationScopeInfo instrumentationScopeInfo ,
190209 long startEpochNanos ,
191210 long epochNanos ) {
211+ boolean reset = aggregationTemporality == AggregationTemporality .DELTA ;
212+
192213 if (memoryMode == REUSABLE_DATA ) {
193214 // Collect can not run concurrently for same reader, hence we safely assume
194215 // the previous collect result has been used and done with
195- reusableResultList .forEach (reusablePointsPool ::returnObject );
196- reusableResultList .clear ();
216+ reusableDeltaPoints .forEach (reusablePointsPool ::returnObject );
217+ reusableDeltaPoints .clear ();
197218 }
198219
199- Collection <T > result ;
200- if (aggregationTemporality == AggregationTemporality .DELTA ) {
201- Map <Attributes , T > points = this .points ;
202- Map <Attributes , T > lastPoints = this .lastPoints ;
220+ Map <Attributes , T > currentPoints = new HashMap <>();
221+ aggregatorHandles .forEach (
222+ (attributes , handle ) -> {
223+ Measurement latestRecordedMeasurement = latestRecordedMeasurements .get (attributes );
224+ if (latestRecordedMeasurement == null ) {
225+ throw new IllegalStateException ("Unexpected" );
226+ }
227+ T value =
228+ handle .aggregateThenMaybeReset (
229+ latestRecordedMeasurement .startEpochNanos (),
230+ latestRecordedMeasurement .epochNanos (),
231+ attributes ,
232+ reset );
233+ currentPoints .put (attributes , value );
234+ });
203235
204- Collection <T > deltaPoints ;
205- if (memoryMode == REUSABLE_DATA ) {
206- deltaPoints = reusableResultList ;
207- } else {
208- deltaPoints = new ArrayList <>();
209- }
236+ if (memoryMode == REUSABLE_DATA ) {
237+ aggregatorHandles .clear ();
238+ latestRecordedMeasurements .clear ();
239+ } else {
240+ aggregatorHandles = new HashMap <>();
241+ latestRecordedMeasurements = new HashMap <>();
242+ }
210243
211- points .forEach (
212- (k , v ) -> {
213- T lastPoint = lastPoints .get (k );
244+ Collection <T > result ;
245+ if (aggregationTemporality == AggregationTemporality .DELTA ) {
246+ Collection <T > deltaPoints =
247+ memoryMode == REUSABLE_DATA ? reusableDeltaPoints : new ArrayList <>();
248+ currentPoints .forEach (
249+ (attributes , currentPoint ) -> {
250+ T lastPoint = lastPoints .get (attributes );
214251
215252 T deltaPoint ;
216253 if (lastPoint == null ) {
217- if (memoryMode == REUSABLE_DATA ) {
218- deltaPoint = reusablePointsPool .borrowObject ();
219- aggregator .copyPoint (v , deltaPoint );
220- } else {
221- deltaPoint = v ;
222- }
254+ deltaPoint = currentPoint ;
223255 } else {
224256 if (memoryMode == REUSABLE_DATA ) {
225- aggregator .diffInPlace (lastPoint , v );
257+ aggregator .diffInPlace (lastPoint , currentPoint );
226258 deltaPoint = lastPoint ;
227259
228260 // Remaining last points are returned to reusablePointsPool, but
229261 // this reusable point is still used, so don't return it to pool yet
230- lastPoints .remove (k );
262+ lastPoints .remove (attributes );
231263 } else {
232- deltaPoint = aggregator .diff (lastPoint , v );
264+ deltaPoint = aggregator .diff (lastPoint , currentPoint );
233265 }
234266 }
235267
236268 deltaPoints .add (deltaPoint );
237269 });
238270
239271 if (memoryMode == REUSABLE_DATA ) {
240- lastPoints .forEach ((k , v ) -> reusablePointsPool .returnObject (v ));
272+ lastPoints .forEach ((attributes , value ) -> reusablePointsPool .returnObject (value ));
241273 lastPoints .clear ();
242- this .points = lastPoints ;
243- } else {
244- this .points = new HashMap <>();
245274 }
246275
247- this .lastPoints = points ;
248276 result = deltaPoints ;
249- } else /* CUMULATIVE */ {
250- if (memoryMode == REUSABLE_DATA ) {
251- points .forEach ((k , v ) -> reusableResultList .add (v ));
252- points .clear ();
253- result = reusableResultList ;
254- } else {
255- result = points .values ();
256- points = new HashMap <>();
257- }
277+ } else {
278+ result = currentPoints .values ();
258279 }
280+ this .lastPoints = currentPoints ;
259281
260282 return aggregator .toMetricData (
261283 resource , instrumentationScopeInfo , metricDescriptor , result , aggregationTemporality );
0 commit comments