Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions flink-autoscaler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,21 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;

/** Tests for {@link RestApiMetricsCollector}. */
public class RestApiMetricsCollectorTest {
class RestApiMetricsCollectorTest {

private static final String GC_METRIC_NAME = "Status.JVM.GarbageCollector.All.TimeMsPerSecond";
private static final String HEAP_MAX_NAME = "Status.JVM.Memory.Heap.Max";
Expand All @@ -64,7 +67,7 @@ public class RestApiMetricsCollectorTest {
private static final String METASPACE_MEMORY_NAME = "Status.JVM.Memory.Metaspace.Used";

@Test
public void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
void testAggregateMultiplePendingRecordsMetricsPerSource() throws Exception {
var collector = new RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>();

JobVertexID jobVertexID = new JobVertexID();
Expand Down Expand Up @@ -145,7 +148,7 @@ CompletableFuture<P> sendRequest(

@Test
@Timeout(60)
public void testJmMetricCollection() throws Exception {
void testJmMetricCollection() throws Exception {
try (MiniCluster miniCluster =
new MiniCluster(
new MiniClusterConfiguration.Builder()
Expand All @@ -160,36 +163,38 @@ public void testJmMetricCollection() throws Exception {
(c, e) ->
new StandaloneClientHAServices(
miniCluster.getRestAddress().get().toString()));
do {
var collector = new RestApiMetricsCollector<>();
Map<FlinkMetric, Metric> flinkMetricMetricMap =
collector.queryJmMetrics(
client,
Map.of(
"taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL,
"taskSlotsAvailable",
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
try {
assertEquals(
"3",
flinkMetricMetricMap.get(FlinkMetric.NUM_TASK_SLOTS_TOTAL).getValue());
assertEquals(
"3",
flinkMetricMetricMap
.get(FlinkMetric.NUM_TASK_SLOTS_AVAILABLE)
.getValue());
break;
} catch (NullPointerException e) {
// Metrics might not be available yet (timeout above will eventually kill this
// test)
Thread.sleep(100);
}
} while (true);
var collector = new RestApiMetricsCollector<>();
Map<FlinkMetric, Metric> flinkMetricMetricMap = new HashMap<>();
// Metrics might not be available yet so retry the query until it returns results or the
// timeout reached.
await().atMost(Duration.ofSeconds(60))
.until(
() -> {
final Map<FlinkMetric, Metric> results =
collector.queryJmMetrics(
client,
Map.of(
"taskSlotsTotal",
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
"taskSlotsAvailable",
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE));
flinkMetricMetricMap.putAll(results);
return !results.isEmpty();
});

assertThat(flinkMetricMetricMap)
.hasSize(2)
.hasEntrySatisfying(
FlinkMetric.NUM_TASK_SLOTS_TOTAL,
metricValue -> assertMetricValueIs(metricValue, 3))
.hasEntrySatisfying(
FlinkMetric.NUM_TASK_SLOTS_AVAILABLE,
metricValue -> assertMetricValueIs(metricValue, 3));
}
}

@Test
public void testTmMetricCollection() throws Exception {
void testTmMetricCollection() throws Exception {

var metricValues = new HashMap<String, AggregatedMetric>();

Expand Down Expand Up @@ -309,4 +314,8 @@ private static void assertMetricsEquals(
assertEquals(v.getSum(), a.getSum(), k.name());
});
}

private static void assertMetricValueIs(Metric metricValue, int expected) {
assertThat(metricValue.getValue()).asInt().isEqualTo(expected);
}
}
1 change: 0 additions & 1 deletion flink-kubernetes-operator-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ under the License.
<packaging>jar</packaging>

<properties>
<awaitility.version>4.1.0</awaitility.version>
<plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
</properties>

Expand Down
1 change: 0 additions & 1 deletion flink-kubernetes-operator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ under the License.
<packaging>jar</packaging>

<properties>
<awaitility.version>4.1.0</awaitility.version>
<plugins.tmp.dir>${project.build.directory}/plugins</plugins.tmp.dir>
<surefire.module.config>
<!-- required by FlinkConfigManagerTest -->
Expand Down
Loading