Skip to content

Commit f01fbc3

Browse files
committed
[FLINK-36473] Make testJmMetricCollection more robust in the face of missing metrics
1 parent 4f87bc2 commit f01fbc3

File tree

1 file changed

+38
-29
lines changed

1 file changed

+38
-29
lines changed

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,21 @@
4444
import org.junit.jupiter.api.Test;
4545
import org.junit.jupiter.api.Timeout;
4646

47+
import java.time.Duration;
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
5051
import java.util.concurrent.CompletableFuture;
5152
import java.util.stream.Collectors;
5253

54+
import static org.assertj.core.api.Assertions.assertThat;
5355
import static org.junit.jupiter.api.Assertions.assertEquals;
5456
import static org.junit.jupiter.api.Assertions.assertThrows;
5557
import static org.junit.jupiter.api.Assertions.fail;
58+
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
5659

5760
/** Tests for {@link RestApiMetricsCollector}. */
58-
public class RestApiMetricsCollectorTest {
61+
class RestApiMetricsCollectorTest {
5962

6063
private static final String GC_METRIC_NAME = "Status.JVM.GarbageCollector.All.TimeMsPerSecond";
6164
private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max";
@@ -64,7 +67,7 @@ public class RestApiMetricsCollectorTest {
6467
private static final String METASPACE_MEMORY_NAME = "Status.JVM.Memory.Metaspace.Used";
6568

6669
@Test
67-
public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
70+
void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
6871
var collector = new RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>();
6972

7073
JobVertexID jobVertexID = new JobVertexID();
@@ -145,7 +148,7 @@ CompletableFuture<P> sendRequest(
145148

146149
@Test
147150
@Timeout(60)
148-
public void testJmMetricCollection() throws Exception {
151+
void testJmMetricCollection() throws Exception {
149152
try (MiniCluster miniCluster =
150153
new MiniCluster(
151154
new MiniClusterConfiguration.Builder()
@@ -160,36 +163,38 @@ public void testJmMetricCollection() throws Exception {
160163
(c, e) ->
161164
new StandaloneClientHAServices(
162165
miniCluster.getRestAddress().get().toString()));
163-
do {
164-
var collector = new RestApiMetricsCollector<>();
165-
Map<FlinkMetric, Metric> flinkMetricMetricMap =
166-
collector.queryJmMetrics(
167-
client,
168-
Map.of(
169-
"taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
170-
"taskSlotsAvailable",
171-
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
172-
try {
173-
assertEquals(
174-
"3",
175-
flinkMetricMetricMap.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
176-
assertEquals(
177-
"3",
178-
flinkMetricMetricMap
179-
.get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
180-
.getValue());
181-
break;
182-
} catch (NullPointerException e) {
183-
// Metrics might not be available yet (timeout above will eventually kill this
184-
// test)
185-
Thread.sleep(100);
186-
}
187-
} while (true);
166+
var collector = new RestApiMetricsCollector<>();
167+
Map<FlinkMetric, Metric> flinkMetricMetricMap = new HashMap<>();
168+
// Metrics might not be available yet so retry the query until it returns results or the
169+
// timeout reached.
170+
await().atMost(Duration.ofSeconds(60))
171+
.until(
172+
() -> {
173+
final Map<FlinkMetric, Metric> results =
174+
collector.queryJmMetrics(
175+
client,
176+
Map.of(
177+
"taskSlotsTotal",
178+
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
179+
"taskSlotsAvailable",
180+
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
181+
flinkMetricMetricMap.putAll(results);
182+
return !results.isEmpty();
183+
});
184+
185+
assertThat(flinkMetricMetricMap)
186+
.hasSize(2)
187+
.hasEntrySatisfying(
188+
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
189+
metricValue -> assertMetricValueIs(metricValue, 3))
190+
.hasEntrySatisfying(
191+
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE,
192+
metricValue -> assertMetricValueIs(metricValue, 3));
188193
}
189194
}
190195

191196
@Test
192-
public void testTmMetricCollection() throws Exception {
197+
void testTmMetricCollection() throws Exception {
193198

194199
var metricValues = new HashMap<String, AggregatedMetric>();
195200

@@ -309,4 +314,8 @@ private static void assertMetricsEquals(
309314
assertEquals(v.getSum(), a.getSum(), k.name());
310315
});
311316
}
317+
318+
private static void assertMetricValueIs(Metric metricValue, int expected) {
319+
assertThat(metricValue.getValue()).asInt().isEqualTo(expected);
320+
}
312321
}

0 commit comments

Comments
 (0)