4444import org .junit .jupiter .api .Test ;
4545import org .junit .jupiter .api .Timeout ;
4646
47+ import java .time .Duration ;
4748import java .util .HashMap ;
4849import java .util .List ;
4950import java .util .Map ;
5051import java .util .concurrent .CompletableFuture ;
5152import java .util .stream .Collectors ;
5253
54+ import static org .assertj .core .api .Assertions .assertThat ;
5355import static org .junit .jupiter .api .Assertions .assertEquals ;
5456import static org .junit .jupiter .api .Assertions .assertThrows ;
5557import static org .junit .jupiter .api .Assertions .fail ;
58+ import static org .testcontainers .shaded .org .awaitility .Awaitility .await ;
5659
5760/** Tests for {@link RestApiMetricsCollector}. */
58- public class RestApiMetricsCollectorTest {
61+ class RestApiMetricsCollectorTest {
5962
6063 private static final String GC_METRIC_NAME = "Status.JVM.GarbageCollector.All.TimeMsPerSecond" ;
6164 private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max" ;
@@ -64,7 +67,7 @@ public class RestApiMetricsCollectorTest {
6467 private static final String METASPACE_MEMORY_NAME = "Status.JVM.Memory.Metaspace.Used" ;
6568
6669 @ Test
67- public void testAggregateMultiplePendingRecordsMetricsPerSource () throws Exception {
70+ void testAggregateMultiplePendingRecordsMetricsPerSource () throws Exception {
6871 var collector = new RestApiMetricsCollector <JobID , JobAutoScalerContext <JobID >>();
6972
7073 JobVertexID jobVertexID = new JobVertexID ();
@@ -145,7 +148,7 @@ CompletableFuture<P> sendRequest(
145148
146149 @ Test
147150 @ Timeout (60 )
148- public void testJmMetricCollection () throws Exception {
151+ void testJmMetricCollection () throws Exception {
149152 try (MiniCluster miniCluster =
150153 new MiniCluster (
151154 new MiniClusterConfiguration .Builder ()
@@ -160,36 +163,33 @@ public void testJmMetricCollection() throws Exception {
160163 (c , e ) ->
161164 new StandaloneClientHAServices (
162165 miniCluster .getRestAddress ().get ().toString ()));
163- do {
164- var collector = new RestApiMetricsCollector <>();
165- Map <FlinkMetric , Metric > flinkMetricMetricMap =
166- collector .queryJmMetrics (
167- client ,
168- Map .of (
169- "taskSlotsTotal" , FlinkMetric .NUM_TASK_SLOTS_TOTAL ,
170- "taskSlotsAvailable" ,
166+ var collector = new RestApiMetricsCollector <>();
167+ Map <FlinkMetric , Metric > flinkMetricMetricMap = new HashMap <>();
168+ // Metrics might not be available yet so retry the query until it returns results or the timeout reached.
169+ await ().atMost (Duration .ofSeconds (60 ))
170+ .until (
171+ () -> {
172+ final Map <FlinkMetric , Metric > results = collector .queryJmMetrics (
173+ client ,
174+ Map .of (
175+ "taskSlotsTotal" , FlinkMetric .NUM_TASK_SLOTS_TOTAL ,
176+ "taskSlotsAvailable" ,
171177 FlinkMetric .NUM_TASK_SLOTS_AVAILABLE ));
172- try {
173- assertEquals (
174- "3" ,
175- flinkMetricMetricMap .get (FlinkMetric .NUM_TASK_SLOTS_TOTAL ).getValue ());
176- assertEquals (
177- "3" ,
178- flinkMetricMetricMap
179- .get (FlinkMetric .NUM_TASK_SLOTS_AVAILABLE )
180- .getValue ());
181- break ;
182- } catch (NullPointerException e ) {
183- // Metrics might not be available yet (timeout above will eventually kill this
184- // test)
185- Thread .sleep (100 );
186- }
187- } while (true );
178+ flinkMetricMetricMap .putAll (results );
179+ return !results
180+ .isEmpty ();
181+ }
182+ );
183+
184+ assertThat (flinkMetricMetricMap )
185+ .hasSize (2 )
186+ .hasEntrySatisfying (FlinkMetric .NUM_TASK_SLOTS_TOTAL , metricValue -> assertMetricValueIs (metricValue , 3 ))
187+ .hasEntrySatisfying (FlinkMetric .NUM_TASK_SLOTS_AVAILABLE , metricValue -> assertMetricValueIs (metricValue , 3 ));
188188 }
189189 }
190190
191191 @ Test
192- public void testTmMetricCollection () throws Exception {
192+ void testTmMetricCollection () throws Exception {
193193
194194 var metricValues = new HashMap <String , AggregatedMetric >();
195195
@@ -309,4 +309,8 @@ private static void assertMetricsEquals(
309309 assertEquals (v .getSum (), a .getSum (), k .name ());
310310 });
311311 }
312+
313+ private static void assertMetricValueIs (Metric metricValue , int expected ) {
314+ assertThat (metricValue .getValue ()).asInt ().isEqualTo (expected );
315+ }
312316}
0 commit comments