4444import org .junit .jupiter .api .Test ;
4545import org .junit .jupiter .api .Timeout ;
4646
47+ import java .time .Duration ;
4748import java .util .HashMap ;
4849import java .util .List ;
4950import java .util .Map ;
5051import java .util .concurrent .CompletableFuture ;
5152import java .util .stream .Collectors ;
5253
54+ import static org .assertj .core .api .Assertions .assertThat ;
55+ import static org .awaitility .Awaitility .await ;
5356import static org .junit .jupiter .api .Assertions .assertEquals ;
5457import static org .junit .jupiter .api .Assertions .assertThrows ;
5558import static org .junit .jupiter .api .Assertions .fail ;
5659
5760/** Tests for {@link RestApiMetricsCollector}. */
58- public class RestApiMetricsCollectorTest {
61+ class RestApiMetricsCollectorTest {
5962
6063 private static final String GC_METRIC_NAME = "Status.JVM.GarbageCollector.All.TimeMsPerSecond" ;
6164 private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max" ;
@@ -64,7 +67,7 @@ public class RestApiMetricsCollectorTest {
6467 private static final String METASPACE_MEMORY_NAME = "Status.JVM.Memory.Metaspace.Used" ;
6568
6669 @ Test
67- public void testAggregateMultiplePendingRecordsMetricsPerSource () throws Exception {
70+ void testAggregateMultiplePendingRecordsMetricsPerSource () throws Exception {
6871 var collector = new RestApiMetricsCollector <JobID , JobAutoScalerContext <JobID >>();
6972
7073 JobVertexID jobVertexID = new JobVertexID ();
@@ -145,7 +148,7 @@ CompletableFuture<P> sendRequest(
145148
146149 @ Test
147150 @ Timeout (60 )
148- public void testJmMetricCollection () throws Exception {
151+ void testJmMetricCollection () throws Exception {
149152 try (MiniCluster miniCluster =
150153 new MiniCluster (
151154 new MiniClusterConfiguration .Builder ()
@@ -160,36 +163,38 @@ public void testJmMetricCollection() throws Exception {
160163 (c , e ) ->
161164 new StandaloneClientHAServices (
162165 miniCluster .getRestAddress ().get ().toString ()));
163- do {
164- var collector = new RestApiMetricsCollector <>();
165- Map <FlinkMetric , Metric > flinkMetricMetricMap =
166- collector .queryJmMetrics (
167- client ,
168- Map .of (
169- "taskSlotsTotal" , FlinkMetric .NUM_TASK_SLOTS_TOTAL ,
170- "taskSlotsAvailable" ,
171- FlinkMetric .NUM_TASK_SLOTS_AVAILABLE ));
172- try {
173- assertEquals (
174- "3" ,
175- flinkMetricMetricMap .get (FlinkMetric .NUM_TASK_SLOTS_TOTAL ).getValue ());
176- assertEquals (
177- "3" ,
178- flinkMetricMetricMap
179- .get (FlinkMetric .NUM_TASK_SLOTS_AVAILABLE )
180- .getValue ());
181- break ;
182- } catch (NullPointerException e ) {
183- // Metrics might not be available yet (timeout above will eventually kill this
184- // test)
185- Thread .sleep (100 );
186- }
187- } while (true );
166+ var collector = new RestApiMetricsCollector <>();
167+ Map <FlinkMetric , Metric > flinkMetricMetricMap = new HashMap <>();
168+ // Metrics might not be available yet so retry the query until it returns results or the
169+ // timeout reached.
170+ await ().atMost (Duration .ofSeconds (60 ))
171+ .until (
172+ () -> {
173+ final Map <FlinkMetric , Metric > results =
174+ collector .queryJmMetrics (
175+ client ,
176+ Map .of (
177+ "taskSlotsTotal" ,
178+ FlinkMetric .NUM_TASK_SLOTS_TOTAL ,
179+ "taskSlotsAvailable" ,
180+ FlinkMetric .NUM_TASK_SLOTS_AVAILABLE ));
181+ flinkMetricMetricMap .putAll (results );
182+ return !results .isEmpty ();
183+ });
184+
185+ assertThat (flinkMetricMetricMap )
186+ .hasSize (2 )
187+ .hasEntrySatisfying (
188+ FlinkMetric .NUM_TASK_SLOTS_TOTAL ,
189+ metricValue -> assertMetricValueIs (metricValue , 3 ))
190+ .hasEntrySatisfying (
191+ FlinkMetric .NUM_TASK_SLOTS_AVAILABLE ,
192+ metricValue -> assertMetricValueIs (metricValue , 3 ));
188193 }
189194 }
190195
191196 @ Test
192- public void testTmMetricCollection () throws Exception {
197+ void testTmMetricCollection () throws Exception {
193198
194199 var metricValues = new HashMap <String , AggregatedMetric >();
195200
@@ -309,4 +314,8 @@ private static void assertMetricsEquals(
309314 assertEquals (v .getSum (), a .getSum (), k .name ());
310315 });
311316 }
317+
318+ private static void assertMetricValueIs (Metric metricValue , int expected ) {
319+ assertThat (metricValue .getValue ()).asInt ().isEqualTo (expected );
320+ }
312321}
0 commit comments