@@ -510,118 +510,43 @@ int MetricPointCount()
510
510
[ Fact ]
511
511
public void MultithreadedLongCounterTest ( )
512
512
{
513
- var exportedItems = new List < Metric > ( ) ;
514
-
515
- using var meter = new Meter ( Utils . GetCurrentMethodName ( ) ) ;
516
- var counterLong = meter . CreateCounter < long > ( "mycounter" ) ;
517
- using var meterProvider = Sdk . CreateMeterProviderBuilder ( )
518
- . AddMeter ( meter . Name )
519
- . AddReader ( new BaseExportingMetricReader ( new InMemoryExporter < Metric > ( exportedItems ) )
520
- {
521
- Temporality = AggregationTemporality . Cumulative ,
522
- } )
523
- . Build ( ) ;
524
-
525
- // setup args to threads.
526
- var mreToBlockUpdateThreads = new ManualResetEvent ( false ) ;
527
- var mreToEnsureAllThreadsStarted = new ManualResetEvent ( false ) ;
528
-
529
- var argToThread = new UpdateThreadArguments < long > ( ) ;
530
- argToThread . DeltaValueUpdatedByEachCall = deltaLongValueUpdatedByEachCall ;
531
- argToThread . Counter = counterLong ;
532
- argToThread . ThreadsStartedCount = 0 ;
533
- argToThread . MreToBlockUpdateThread = mreToBlockUpdateThreads ;
534
- argToThread . MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted ;
535
-
536
- Thread [ ] t = new Thread [ numberOfThreads ] ;
537
- for ( int i = 0 ; i < numberOfThreads ; i ++ )
538
- {
539
- t [ i ] = new Thread ( CounterUpdateThread < long > ) ;
540
- t [ i ] . Start ( argToThread ) ;
541
- }
542
-
543
- // Block until all threads started.
544
- mreToEnsureAllThreadsStarted . WaitOne ( ) ;
545
-
546
- Stopwatch sw = Stopwatch . StartNew ( ) ;
547
-
548
- // unblock all the threads.
549
- // (i.e let them start counter.Add)
550
- mreToBlockUpdateThreads . Set ( ) ;
551
-
552
- for ( int i = 0 ; i < numberOfThreads ; i ++ )
553
- {
554
- // wait for all threads to complete
555
- t [ i ] . Join ( ) ;
556
- }
557
-
558
- var timeTakenInMilliseconds = sw . ElapsedMilliseconds ;
559
- this . output . WriteLine ( $ "Took { timeTakenInMilliseconds } msecs. Total threads: { numberOfThreads } , each thread doing { numberOfMetricUpdateByEachThread } recordings.") ;
560
-
561
- meterProvider . ForceFlush ( MaxTimeToAllowForFlush ) ;
562
-
563
- var sumReceived = GetLongSum ( exportedItems ) ;
564
- var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads ;
565
- Assert . Equal ( expectedSum , sumReceived ) ;
513
+ this . MultithreadedCounterTest ( deltaLongValueUpdatedByEachCall ) ;
566
514
}
567
515
568
516
[ Fact ]
569
517
public void MultithreadedDoubleCounterTest ( )
570
518
{
571
- var exportedItems = new List < Metric > ( ) ;
572
-
573
- using var meter = new Meter ( Utils . GetCurrentMethodName ( ) ) ;
574
- var counterDouble = meter . CreateCounter < double > ( "mycounter" ) ;
575
- using var meterProvider = Sdk . CreateMeterProviderBuilder ( )
576
- . AddMeter ( meter . Name )
577
- . AddReader ( new BaseExportingMetricReader ( new InMemoryExporter < Metric > ( exportedItems ) )
578
- {
579
- Temporality = AggregationTemporality . Cumulative ,
580
- } )
581
- . Build ( ) ;
582
-
583
- // setup args to threads.
584
- var mreToBlockUpdateThreads = new ManualResetEvent ( false ) ;
585
- var mreToEnsureAllThreadsStarted = new ManualResetEvent ( false ) ;
586
-
587
- var argToThread = new UpdateThreadArguments < double > ( ) ;
588
- argToThread . DeltaValueUpdatedByEachCall = deltaDoubleValueUpdatedByEachCall ;
589
- argToThread . Counter = counterDouble ;
590
- argToThread . ThreadsStartedCount = 0 ;
591
- argToThread . MreToBlockUpdateThread = mreToBlockUpdateThreads ;
592
- argToThread . MreToEnsureAllThreadsStart = mreToEnsureAllThreadsStarted ;
519
+ this . MultithreadedCounterTest ( deltaDoubleValueUpdatedByEachCall ) ;
520
+ }
593
521
594
- Thread [ ] t = new Thread [ numberOfThreads ] ;
595
- for ( int i = 0 ; i < numberOfThreads ; i ++ )
522
+ [ Fact ]
523
+ public void MultithreadedLongHistogramTest ( )
524
+ {
525
+ var expected = new long [ 11 ] ;
526
+ for ( var i = 0 ; i < expected . Length ; i ++ )
596
527
{
597
- t [ i ] = new Thread ( CounterUpdateThread < double > ) ;
598
- t [ i ] . Start ( argToThread ) ;
528
+ expected [ i ] = numberOfThreads * numberOfMetricUpdateByEachThread ;
599
529
}
600
530
601
- // Block until all threads started.
602
- mreToEnsureAllThreadsStarted . WaitOne ( ) ;
603
-
604
- Stopwatch sw = Stopwatch . StartNew ( ) ;
531
+ // Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000
532
+ var values = new long [ ] { - 1 , 1 , 6 , 20 , 40 , 60 , 80 , 200 , 300 , 600 , 1001 } ;
605
533
606
- // unblock all the threads.
607
- // (i.e let them start counter.Add)
608
- mreToBlockUpdateThreads . Set ( ) ;
534
+ this . MultithreadedHistogramTest ( expected , values ) ;
535
+ }
609
536
610
- for ( int i = 0 ; i < numberOfThreads ; i ++ )
537
+ [ Fact ]
538
+ public void MultithreadedDoubleHistogramTest ( )
539
+ {
540
+ var expected = new long [ 11 ] ;
541
+ for ( var i = 0 ; i < expected . Length ; i ++ )
611
542
{
612
- // wait for all threads to complete
613
- t [ i ] . Join ( ) ;
543
+ expected [ i ] = numberOfThreads * numberOfMetricUpdateByEachThread ;
614
544
}
615
545
616
- var timeTakenInMilliseconds = sw . ElapsedMilliseconds ;
617
- this . output . WriteLine ( $ "Took { timeTakenInMilliseconds } msecs. Total threads: { numberOfThreads } , each thread doing { numberOfMetricUpdateByEachThread } recordings.") ;
618
-
619
- meterProvider . ForceFlush ( MaxTimeToAllowForFlush ) ;
546
+ // Metric.DefaultHistogramBounds: 0, 5, 10, 25, 50, 75, 100, 250, 500, 1000
547
+ var values = new double [ ] { - 1.0 , 1.0 , 6.0 , 20.0 , 40.0 , 60.0 , 80.0 , 200.0 , 300.0 , 600.0 , 1001.0 } ;
620
548
621
- var sumReceived = GetDoubleSum ( exportedItems ) ;
622
- var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads ;
623
- var difference = Math . Abs ( sumReceived - expectedSum ) ;
624
- Assert . True ( difference <= 0.0001 ) ;
549
+ this . MultithreadedHistogramTest ( expected , values ) ;
625
550
}
626
551
627
552
[ Theory ]
@@ -736,16 +661,15 @@ private static double GetDoubleSum(List<Metric> metrics)
736
661
private static void CounterUpdateThread < T > ( object obj )
737
662
where T : struct , IComparable
738
663
{
739
- var arguments = obj as UpdateThreadArguments < T > ;
740
- if ( arguments == null )
664
+ if ( obj is not UpdateThreadArguments < T > arguments )
741
665
{
742
666
throw new Exception ( "Invalid args" ) ;
743
667
}
744
668
745
669
var mre = arguments . MreToBlockUpdateThread ;
746
670
var mreToEnsureAllThreadsStart = arguments . MreToEnsureAllThreadsStart ;
747
- var counter = arguments . Counter ;
748
- var valueToUpdate = arguments . DeltaValueUpdatedByEachCall ;
671
+ var counter = arguments . Instrument as Counter < T > ;
672
+ var valueToUpdate = arguments . ValuesToRecord [ 0 ] ;
749
673
if ( Interlocked . Increment ( ref arguments . ThreadsStartedCount ) == numberOfThreads )
750
674
{
751
675
mreToEnsureAllThreadsStart . Set ( ) ;
@@ -760,14 +684,149 @@ private static void CounterUpdateThread<T>(object obj)
760
684
}
761
685
}
762
686
687
+ private static void HistogramUpdateThread < T > ( object obj )
688
+ where T : struct , IComparable
689
+ {
690
+ if ( obj is not UpdateThreadArguments < T > arguments )
691
+ {
692
+ throw new Exception ( "Invalid args" ) ;
693
+ }
694
+
695
+ var mre = arguments . MreToBlockUpdateThread ;
696
+ var mreToEnsureAllThreadsStart = arguments . MreToEnsureAllThreadsStart ;
697
+ var histogram = arguments . Instrument as Histogram < T > ;
698
+
699
+ if ( Interlocked . Increment ( ref arguments . ThreadsStartedCount ) == numberOfThreads )
700
+ {
701
+ mreToEnsureAllThreadsStart . Set ( ) ;
702
+ }
703
+
704
+ // Wait until signalled to start calling update on aggregator
705
+ mre . WaitOne ( ) ;
706
+
707
+ for ( int i = 0 ; i < numberOfMetricUpdateByEachThread ; i ++ )
708
+ {
709
+ for ( int j = 0 ; j < arguments . ValuesToRecord . Length ; j ++ )
710
+ {
711
+ histogram . Record ( arguments . ValuesToRecord [ j ] ) ;
712
+ }
713
+ }
714
+ }
715
+
716
+ private void MultithreadedCounterTest < T > ( T deltaValueUpdatedByEachCall )
717
+ where T : struct , IComparable
718
+ {
719
+ var metricItems = new List < Metric > ( ) ;
720
+ var metricReader = new BaseExportingMetricReader ( new InMemoryExporter < Metric > ( metricItems ) ) ;
721
+
722
+ using var meter = new Meter ( $ "{ Utils . GetCurrentMethodName ( ) } .{ typeof ( T ) . Name } .{ deltaValueUpdatedByEachCall } ") ;
723
+ using var meterProvider = Sdk . CreateMeterProviderBuilder ( )
724
+ . AddMeter ( meter . Name )
725
+ . AddReader ( metricReader )
726
+ . Build ( ) ;
727
+
728
+ var argToThread = new UpdateThreadArguments < T >
729
+ {
730
+ ValuesToRecord = new T [ ] { deltaValueUpdatedByEachCall } ,
731
+ Instrument = meter . CreateCounter < T > ( "counter" ) ,
732
+ MreToBlockUpdateThread = new ManualResetEvent ( false ) ,
733
+ MreToEnsureAllThreadsStart = new ManualResetEvent ( false ) ,
734
+ } ;
735
+
736
+ Thread [ ] t = new Thread [ numberOfThreads ] ;
737
+ for ( int i = 0 ; i < numberOfThreads ; i ++ )
738
+ {
739
+ t [ i ] = new Thread ( CounterUpdateThread < T > ) ;
740
+ t [ i ] . Start ( argToThread ) ;
741
+ }
742
+
743
+ argToThread . MreToEnsureAllThreadsStart . WaitOne ( ) ;
744
+ Stopwatch sw = Stopwatch . StartNew ( ) ;
745
+ argToThread . MreToBlockUpdateThread . Set ( ) ;
746
+
747
+ for ( int i = 0 ; i < numberOfThreads ; i ++ )
748
+ {
749
+ t [ i ] . Join ( ) ;
750
+ }
751
+
752
+ this . output . WriteLine ( $ "Took { sw . ElapsedMilliseconds } msecs. Total threads: { numberOfThreads } , each thread doing { numberOfMetricUpdateByEachThread } recordings.") ;
753
+
754
+ metricReader . Collect ( ) ;
755
+
756
+ if ( typeof ( T ) == typeof ( long ) )
757
+ {
758
+ var sumReceived = GetLongSum ( metricItems ) ;
759
+ var expectedSum = deltaLongValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads ;
760
+ Assert . Equal ( expectedSum , sumReceived ) ;
761
+ }
762
+ else if ( typeof ( T ) == typeof ( double ) )
763
+ {
764
+ var sumReceived = GetDoubleSum ( metricItems ) ;
765
+ var expectedSum = deltaDoubleValueUpdatedByEachCall * numberOfMetricUpdateByEachThread * numberOfThreads ;
766
+ Assert . Equal ( expectedSum , sumReceived , 2 ) ;
767
+ }
768
+ }
769
+
770
+ private void MultithreadedHistogramTest < T > ( long [ ] expected , T [ ] values )
771
+ where T : struct , IComparable
772
+ {
773
+ var bucketCounts = new long [ 11 ] ;
774
+ var metricReader = new BaseExportingMetricReader ( new TestExporter < Metric > ( batch =>
775
+ {
776
+ foreach ( var metric in batch )
777
+ {
778
+ foreach ( var metricPoint in metric . GetMetricPoints ( ) )
779
+ {
780
+ bucketCounts = metricPoint . GetHistogramBuckets ( ) . BucketCounts ;
781
+ }
782
+ }
783
+ } ) ) ;
784
+
785
+ using var meter = new Meter ( $ "{ Utils . GetCurrentMethodName ( ) } .{ typeof ( T ) . Name } ") ;
786
+ using var meterProvider = Sdk . CreateMeterProviderBuilder ( )
787
+ . AddMeter ( meter . Name )
788
+ . AddReader ( metricReader )
789
+ . Build ( ) ;
790
+
791
+ var argsToThread = new UpdateThreadArguments < T >
792
+ {
793
+ Instrument = meter . CreateHistogram < T > ( "histogram" ) ,
794
+ MreToBlockUpdateThread = new ManualResetEvent ( false ) ,
795
+ MreToEnsureAllThreadsStart = new ManualResetEvent ( false ) ,
796
+ ValuesToRecord = values ,
797
+ } ;
798
+
799
+ Thread [ ] t = new Thread [ numberOfThreads ] ;
800
+ for ( int i = 0 ; i < numberOfThreads ; i ++ )
801
+ {
802
+ t [ i ] = new Thread ( HistogramUpdateThread < T > ) ;
803
+ t [ i ] . Start ( argsToThread ) ;
804
+ }
805
+
806
+ argsToThread . MreToEnsureAllThreadsStart . WaitOne ( ) ;
807
+ Stopwatch sw = Stopwatch . StartNew ( ) ;
808
+ argsToThread . MreToBlockUpdateThread . Set ( ) ;
809
+
810
+ for ( int i = 0 ; i < numberOfThreads ; i ++ )
811
+ {
812
+ t [ i ] . Join ( ) ;
813
+ }
814
+
815
+ this . output . WriteLine ( $ "Took { sw . ElapsedMilliseconds } msecs. Total threads: { numberOfThreads } , each thread doing { numberOfMetricUpdateByEachThread * values . Length } recordings.") ;
816
+
817
+ metricReader . Collect ( ) ;
818
+
819
+ Assert . Equal ( expected , bucketCounts ) ;
820
+ }
821
+
763
822
private class UpdateThreadArguments < T >
764
823
where T : struct , IComparable
765
824
{
766
825
public ManualResetEvent MreToBlockUpdateThread ;
767
826
public ManualResetEvent MreToEnsureAllThreadsStart ;
768
827
public int ThreadsStartedCount ;
769
- public Counter < T > Counter ;
770
- public T DeltaValueUpdatedByEachCall ;
828
+ public Instrument < T > Instrument ;
829
+ public T [ ] ValuesToRecord ;
771
830
}
772
831
}
773
832
}
0 commit comments