@@ -14,17 +14,18 @@ internal class MetricAggregator : IMetricAggregator
1414 private readonly TimeSpan _flushInterval ;
1515
1616 private readonly SemaphoreSlim _codeLocationLock = new ( 1 , 1 ) ;
17+ private readonly ReaderWriterLockSlim _bucketsLock = new ReaderWriterLockSlim ( ) ;
1718
1819 private readonly CancellationTokenSource _shutdownSource ;
1920 private volatile bool _disposed ;
2021
2122 // The key for this dictionary is the Timestamp for the bucket, rounded down to the nearest RollupInSeconds... so it
2223 // aggregates all of the metrics data for a particular time period. The Value is a dictionary for the metrics,
2324 // each of which has a key that uniquely identifies it within the time period
24- internal ConcurrentDictionary < long , ConcurrentDictionary < string , Metric > > Buckets => _buckets . Value ;
25+ internal Dictionary < long , ConcurrentDictionary < string , Metric > > Buckets => _buckets . Value ;
2526
26- private readonly Lazy < ConcurrentDictionary < long , ConcurrentDictionary < string , Metric > > > _buckets
27- = new ( ( ) => new ConcurrentDictionary < long , ConcurrentDictionary < string , Metric > > ( ) ) ;
27+ private readonly Lazy < Dictionary < long , ConcurrentDictionary < string , Metric > > > _buckets
28+ = new ( ( ) => new Dictionary < long , ConcurrentDictionary < string , Metric > > ( ) ) ;
2829
2930 private long _lastClearedStaleLocations = DateTimeOffset . UtcNow . GetDayBucketKey ( ) ;
3031 private readonly ConcurrentDictionary < long , HashSet < MetricResourceIdentifier > > _seenLocations = new ( ) ;
@@ -183,10 +184,7 @@ private void Emit(
183184 _ => throw new ArgumentOutOfRangeException ( nameof ( type ) , type , "Unknown MetricType" )
184185 } ;
185186
186- var timeBucket = Buckets . GetOrAdd (
187- timestamp . Value . GetTimeBucketKey ( ) ,
188- _ => new ConcurrentDictionary < string , Metric > ( )
189- ) ;
187+ var timeBucket = GetOrAddTimeBucket ( timestamp . Value . GetTimeBucketKey ( ) ) ;
190188
191189 timeBucket . AddOrUpdate (
192190 GetMetricBucketKey ( type , key , unit . Value , tags ) ,
@@ -216,6 +214,40 @@ private void Emit(
216214 }
217215 }
218216
217+ private ConcurrentDictionary < string , Metric > GetOrAddTimeBucket ( long bucketKey )
218+ {
219+ _bucketsLock . EnterUpgradeableReadLock ( ) ;
220+ try
221+ {
222+ if ( Buckets . TryGetValue ( bucketKey , out var existingBucket ) )
223+ {
224+ return existingBucket ;
225+ }
226+
227+ _bucketsLock . EnterWriteLock ( ) ;
228+ try
229+ {
230+ // Check again in case another thread added the bucket while we were waiting for the write lock
231+ if ( Buckets . TryGetValue ( bucketKey , out existingBucket ) )
232+ {
233+ return existingBucket ;
234+ }
235+
236+ var timeBucket = new ConcurrentDictionary < string , Metric > ( ) ;
237+ Buckets [ bucketKey ] = timeBucket ;
238+ return timeBucket ;
239+ }
240+ finally
241+ {
242+ _bucketsLock . ExitWriteLock ( ) ;
243+ }
244+ }
245+ finally
246+ {
247+ _bucketsLock . ExitUpgradeableReadLock ( ) ;
248+ }
249+ }
250+
219251 internal void RecordCodeLocation (
220252 MetricType type ,
221253 string key ,
@@ -334,9 +366,20 @@ public async Task FlushAsync(bool force = true, CancellationToken cancellationTo
334366 cancellationToken . ThrowIfCancellationRequested ( ) ;
335367
336368 _options . LogDebug ( "Flushing metrics for bucket {0}" , key ) ;
337- if ( ! Buckets . TryRemove ( key , out var bucket ) )
369+ ConcurrentDictionary < string , Metric > ? bucket ;
370+ _bucketsLock . EnterWriteLock ( ) ;
371+ try
372+ {
373+ if ( ! Buckets . ContainsKey ( key ) )
374+ {
375+ continue ;
376+ }
377+ bucket = Buckets [ key ] ;
378+ Buckets . Remove ( key ) ;
379+ }
380+ finally
338381 {
339- continue ;
382+ _bucketsLock . ExitWriteLock ( ) ;
340383 }
341384
342385 _captureMetrics ( bucket . Values ) ;
@@ -383,18 +426,28 @@ internal IEnumerable<long> GetFlushableBuckets(bool force = false)
383426 yield break ;
384427 }
385428
429+ long [ ] keys ;
430+ _bucketsLock . EnterReadLock ( ) ;
431+ try
432+ {
433+ keys = Buckets . Keys . ToArray ( ) ;
434+ }
435+ finally
436+ {
437+ _bucketsLock . ExitReadLock ( ) ;
438+ }
386439 if ( force )
387440 {
388441 // Return all the buckets in this case
389- foreach ( var key in Buckets . Keys )
442+ foreach ( var key in keys )
390443 {
391444 yield return key ;
392445 }
393446 }
394447 else
395448 {
396449 var cutoff = MetricHelper . GetCutoff ( ) ;
397- foreach ( var key in Buckets . Keys )
450+ foreach ( var key in keys )
398451 {
399452 var bucketTime = DateTimeOffset . FromUnixTimeSeconds ( key ) ;
400453 if ( bucketTime < cutoff )
0 commit comments