Skip to content

Commit b0c7f1a

Browse files
authored
[FLINK-36473] Wait for metrics before asserting on them
1 parent 5bce141 commit b0c7f1a

File tree

6 files changed

+237
-267
lines changed

6 files changed

+237
-267
lines changed

flink-autoscaler/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ under the License.
107107
<scope>test</scope>
108108
</dependency>
109109

110+
<dependency>
111+
<groupId>org.awaitility</groupId>
112+
<artifactId>awaitility</artifactId>
113+
<version>${awaitility.version}</version>
114+
<scope>test</scope>
115+
</dependency>
116+
110117
</dependencies>
111118

112119
<build>

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RestApiMetricsCollectorTest.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,18 +44,21 @@
4444
import org.junit.jupiter.api.Test;
4545
import org.junit.jupiter.api.Timeout;
4646

47+
import java.time.Duration;
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
5051
import java.util.concurrent.CompletableFuture;
5152
import java.util.stream.Collectors;
5253

54+
import static org.assertj.core.api.Assertions.assertThat;
55+
import static org.awaitility.Awaitility.await;
5356
import static org.junit.jupiter.api.Assertions.assertEquals;
5457
import static org.junit.jupiter.api.Assertions.assertThrows;
5558
import static org.junit.jupiter.api.Assertions.fail;
5659

5760
/** Tests for {@link RestApiMetricsCollector}. */
58-
public class RestApiMetricsCollectorTest {
61+
class RestApiMetricsCollectorTest {
5962

6063
private static final String GC_METRIC_NAME = "Status.JVM.GarbageCollector.All.TimeMsPerSecond";
6164
private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max";
@@ -64,7 +67,7 @@ public class RestApiMetricsCollectorTest {
6467
private static final String METASPACE_MEMORY_NAME = "Status.JVM.Memory.Metaspace.Used";
6568

6669
@Test
67-
public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
70+
void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
6871
var collector = new RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>();
6972

7073
JobVertexID jobVertexID = new JobVertexID();
@@ -145,7 +148,7 @@ CompletableFuture<P> sendRequest(
145148

146149
@Test
147150
@Timeout(60)
148-
public void testJmMetricCollection() throws Exception {
151+
void testJmMetricCollection() throws Exception {
149152
try (MiniCluster miniCluster =
150153
new MiniCluster(
151154
new MiniClusterConfiguration.Builder()
@@ -160,36 +163,38 @@ public void testJmMetricCollection() throws Exception {
160163
(c, e) ->
161164
new StandaloneClientHAServices(
162165
miniCluster.getRestAddress().get().toString()));
163-
do {
164-
var collector = new RestApiMetricsCollector<>();
165-
Map<FlinkMetric, Metric> flinkMetricMetricMap =
166-
collector.queryJmMetrics(
167-
client,
168-
Map.of(
169-
"taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
170-
"taskSlotsAvailable",
171-
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
172-
try {
173-
assertEquals(
174-
"3",
175-
flinkMetricMetricMap.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
176-
assertEquals(
177-
"3",
178-
flinkMetricMetricMap
179-
.get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
180-
.getValue());
181-
break;
182-
} catch (NullPointerException e) {
183-
// Metrics might not be available yet (timeout above will eventually kill this
184-
// test)
185-
Thread.sleep(100);
186-
}
187-
} while (true);
166+
var collector = new RestApiMetricsCollector<>();
167+
Map<FlinkMetric, Metric> flinkMetricMetricMap = new HashMap<>();
168+
// Metrics might not be available yet so retry the query until it returns results or the
169+
// timeout reached.
170+
await().atMost(Duration.ofSeconds(60))
171+
.until(
172+
() -> {
173+
final Map<FlinkMetric, Metric> results =
174+
collector.queryJmMetrics(
175+
client,
176+
Map.of(
177+
"taskSlotsTotal",
178+
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
179+
"taskSlotsAvailable",
180+
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
181+
flinkMetricMetricMap.putAll(results);
182+
return !results.isEmpty();
183+
});
184+
185+
assertThat(flinkMetricMetricMap)
186+
.hasSize(2)
187+
.hasEntrySatisfying(
188+
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
189+
metricValue -> assertMetricValueIs(metricValue, 3))
190+
.hasEntrySatisfying(
191+
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE,
192+
metricValue -> assertMetricValueIs(metricValue, 3));
188193
}
189194
}
190195

191196
@Test
192-
public void testTmMetricCollection() throws Exception {
197+
void testTmMetricCollection() throws Exception {
193198

194199
var metricValues = new HashMap<String, AggregatedMetric>();
195200

@@ -309,4 +314,8 @@ private static void assertMetricsEquals(
309314
assertEquals(v.getSum(), a.getSum(), k.name());
310315
});
311316
}
317+
318+
private static void assertMetricValueIs(Metric metricValue, int expected) {
319+
assertThat(metricValue.getValue()).asInt().isEqualTo(expected);
320+
}
312321
}

flink-kubernetes-operator-api/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ under the License.
3232
<packaging>jar</packaging>
3333

3434
<properties>
35-
<awaitility.version>4.1.0</awaitility.version>
3635
<plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
3736
</properties>
3837

flink-kubernetes-operator/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ under the License.
3232
<packaging>jar</packaging>
3333

3434
<properties>
35-
<awaitility.version>4.1.0</awaitility.version>
3635
<plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
3736
<surefire.module.config>
3837
<!-- required by FlinkConfigManagerTest -->

0 commit comments

Comments
 (0)