@@ -20,6 +20,8 @@ public class SynchronousMetricStorage: SynchronousMetricStorageProtocol {
2020 var aggregatorHandles = [ [ String: AttributeValue] : AggregatorHandle] ( )
2121 let attributeProcessor : AttributeProcessor
2222 var aggregatorHandlePool = [ AggregatorHandle] ( )
23+ private let aggregatorHandlesQueue = DispatchQueue ( label: " org.opentelemetry.SynchronousMetricStorage.aggregatorHandlesQueue " , attributes: . concurrent)
24+
2325
2426 static func empty( ) -> SynchronousMetricStorageProtocol {
2527 return EmptyMetricStorage . instance
@@ -48,22 +50,24 @@ public class SynchronousMetricStorage: SynchronousMetricStorageProtocol {
4850 }
4951
5052 private func getAggregatorHandle( attributes: [ String : AttributeValue ] ) throws -> AggregatorHandle {
51- let processedAttributes = attributeProcessor . process ( incoming : attributes )
52- if let handle = aggregatorHandles [ processedAttributes ] {
53- return handle
54- }
55- if aggregatorHandles . count >= MetricStorageConstants . MAX_CARDINALITY {
56- // error
57- throw MetricStoreError . maxCardinality
58- }
59-
60- let newHandle = aggregatorHandlePool . isEmpty ? aggregator . createHandle ( ) : aggregatorHandlePool . remove ( at : 0 )
61- if let existingHandle = aggregatorHandles [ processedAttributes ] {
62- return existingHandle
63- } else {
53+ var aggregatorHandle : AggregatorHandle !
54+ try aggregatorHandlesQueue . sync {
55+ let processedAttributes = attributeProcessor . process ( incoming : attributes )
56+ if let handle = aggregatorHandles [ processedAttributes ] {
57+ aggregatorHandle = handle
58+ return
59+ }
60+
61+ guard aggregatorHandles . count < MetricStorageConstants . MAX_CARDINALITY else {
62+ throw MetricStoreError . maxCardinality
63+ }
64+
65+ let newHandle = aggregatorHandlePool . isEmpty ? aggregator . createHandle ( ) : aggregatorHandlePool . remove ( at : 0 )
6466 aggregatorHandles [ processedAttributes] = newHandle
65- return newHandle
67+ aggregatorHandle = newHandle
6668 }
69+ return aggregatorHandle
70+
6771 }
6872
6973 public func collect( resource: Resource , scope: InstrumentationScopeInfo , startEpochNanos: UInt64 , epochNanos: UInt64 ) -> StableMetricData {
@@ -72,13 +76,15 @@ public class SynchronousMetricStorage: SynchronousMetricStorageProtocol {
7276
7377 var points = [ PointData] ( )
7478
75- aggregatorHandles. forEach { key, value in
76- let point = value. aggregateThenMaybeReset ( startEpochNano: start, endEpochNano: epochNanos, attributes: key, reset: reset)
77- if reset {
78- aggregatorHandles. removeValue ( forKey: key)
79- aggregatorHandlePool. append ( value)
79+ aggregatorHandlesQueue. sync ( flags: . barrier) {
80+ aggregatorHandles. forEach { key, value in
81+ let point = value. aggregateThenMaybeReset ( startEpochNano: start, endEpochNano: epochNanos, attributes: key, reset: reset)
82+ if reset {
83+ aggregatorHandles. removeValue ( forKey: key)
84+ aggregatorHandlePool. append ( value)
85+ }
86+ points. append ( point)
8087 }
81- points. append ( point)
8288 }
8389
8490 if points. isEmpty {
0 commit comments