3030import java .util .function .Consumer ;
3131import java .util .function .Function ;
3232import java .util .function .Predicate ;
33+ import java .util .stream .Collectors ;
34+ import java .util .stream .Stream ;
3335
36+ import static java .util .Map .entry ;
3437import static org .hamcrest .Matchers .closeTo ;
3538import static org .hamcrest .Matchers .equalTo ;
3639
@@ -57,27 +60,21 @@ protected String getTestRestCluster() {
5760
5861 @ SuppressWarnings ("unchecked" )
5962 public void testApmIntegration () throws Exception {
60- Map <String , Predicate <Map <String , Object >>> sampleAssertions = new HashMap <>(
63+ Map <String , Predicate <Map <String , Object >>> valueAssertions = new HashMap <>(
6164 Map .ofEntries (
6265 assertion ("es.test.long_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
6366 assertion ("es.test.double_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
6467 assertion ("es.test.async_double_counter.total" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
6568 assertion ("es.test.async_long_counter.total" , m -> (Integer ) m .get ("value" ), equalTo (1 )),
6669 assertion ("es.test.double_gauge.current" , m -> (Double ) m .get ("value" ), closeTo (1.0 , 0.001 )),
67- assertion ("es.test.long_gauge.current" , m -> (Integer ) m .get ("value" ), equalTo (1 )),
68- assertion (
69- "es.test.double_histogram.histogram" ,
70- m -> ((Collection <Integer >) m .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum (),
71- equalTo (2 )
72- ),
73- assertion (
74- "es.test.long_histogram.histogram" ,
75- m -> ((Collection <Integer >) m .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum (),
76- equalTo (2 )
77- )
70+ assertion ("es.test.long_gauge.current" , m -> (Integer ) m .get ("value" ), equalTo (1 ))
7871 )
7972 );
8073
74+ Map <String , Integer > histogramAssertions = new HashMap <>(
75+ Map .ofEntries (entry ("es.test.double_histogram.histogram" , 2 ), entry ("es.test.long_histogram.histogram" , 2 ))
76+ );
77+
8178 CountDownLatch finished = new CountDownLatch (1 );
8279
8380 // a consumer that will remove the assertions from a map once it matched
@@ -89,15 +86,36 @@ public void testApmIntegration() throws Exception {
8986 var metricset = (Map <String , Object >) apmMessage .get ("metricset" );
9087 var samples = (Map <String , Object >) metricset .get ("samples" );
9188
92- samples .entrySet ().forEach (sampleEntry -> {
93- var assertion = sampleAssertions .get (sampleEntry .getKey ());// sample name
94- if (assertion != null && assertion .test ((Map <String , Object >) sampleEntry .getValue ())) {// sample object
95- sampleAssertions .remove (sampleEntry .getKey ());
89+ samples .forEach ((key , value ) -> {
90+ var valueAssertion = valueAssertions .get (key );// sample name
91+ if (valueAssertion != null ) {
92+ logger .info ("Matched {}:{}" , key , value );
93+ var sampleObject = (Map <String , Object >) value ;
94+ if (valueAssertion .test (sampleObject )) {// sample object
95+ logger .info ("{} assertion PASSED" , key );
96+ valueAssertions .remove (key );
97+ } else {
98+ logger .error ("{} assertion FAILED" , key );
99+ }
100+ }
101+ var histogramAssertion = histogramAssertions .get (key );
102+ if (histogramAssertion != null ) {
103+ logger .info ("Matched {}:{}" , key , value );
104+ var samplesObject = (Map <String , Object >) value ;
105+ var counts = ((Collection <Integer >) samplesObject .get ("counts" )).stream ().mapToInt (Integer ::intValue ).sum ();
106+ var remaining = histogramAssertion - counts ;
107+ if (remaining == 0 ) {
108+ logger .info ("{} assertion PASSED" , key );
109+ histogramAssertions .remove (key );
110+ } else {
111+ logger .info ("{} assertion PENDING: {} remaining" , key , remaining );
112+ histogramAssertions .put (key , remaining );
113+ }
96114 }
97115 });
98116 }
99117
100- if (sampleAssertions .isEmpty ()) {
118+ if (valueAssertions .isEmpty ()) {
101119 finished .countDown ();
102120 }
103121 };
@@ -106,18 +124,18 @@ public void testApmIntegration() throws Exception {
106124
107125 client ().performRequest (new Request ("GET" , "/_use_apm_metrics" ));
108126
109- assertTrue (
110- "Timeout when waiting for assertions to complete. Remaining assertions to match: " + sampleAssertions ,
111- finished . await ( 30 , TimeUnit . SECONDS )
112- );
127+ var completed = finished . await ( 30 , TimeUnit . SECONDS );
128+ var remainingAssertions = Stream . concat ( valueAssertions . keySet (). stream (), histogramAssertions . keySet (). stream ())
129+ . collect ( Collectors . joining ());
130+ assertTrue ( "Timeout when waiting for assertions to complete. Remaining assertions to match: " + remainingAssertions , completed );
113131 }
114132
115133 private <T > Map .Entry <String , Predicate <Map <String , Object >>> assertion (
116134 String sampleKeyName ,
117135 Function <Map <String , Object >, T > accessor ,
118136 Matcher <T > expected
119137 ) {
120- return Map . entry (sampleKeyName , new Predicate <>() {
138+ return entry (sampleKeyName , new Predicate <>() {
121139 @ Override
122140 public boolean test (Map <String , Object > sampleObject ) {
123141 return expected .matches (accessor .apply (sampleObject ));
0 commit comments