diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java index 4c41aedf8f..dbd68ba4ed 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java @@ -335,7 +335,8 @@ private static boolean checkpointExists(ConfigMap cm) { } private static boolean isJobGraphKey(Map.Entry entry) { - return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX); + return entry.getKey().startsWith(Constants.JOB_GRAPH_STORE_KEY_PREFIX) + || entry.getKey().startsWith("executionPlan-"); } public static boolean isZookeeperHAActivated(Configuration configuration) { diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java index 68ee3e3d95..003557f91e 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java @@ -18,7 +18,6 @@ package org.apache.flink.kubernetes.operator.utils; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.HighAvailabilityOptions; @@ -51,6 +50,8 @@ import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; import io.javaoperatorsdk.operator.processing.event.ResourceID; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.net.HttpURLConnection; import java.util.Collections; @@ -125,12 +126,13 @@ public void testAddStartupProbe() { assertEquals(expectedProbe, pod.getSpec().getContainers().get(1).getStartupProbe()); } - @Test - public void testDeleteJobGraphInKubernetesHA() { + @ParameterizedTest + @ValueSource(strings = {"jobGraph-jobId", "executionPlan-jobId"}) + public void testDeleteJobGraphInKubernetesHA(String key) { final String name = "ha-configmap"; final String clusterId = "cluster-id"; final Map data = new HashMap<>(); - data.put(Constants.JOB_GRAPH_STORE_KEY_PREFIX + JobID.generate(), "job-graph-data"); + data.put(key, "job-graph-data"); data.put("leader", "localhost"); createHAConfigMapWithData(name, kubernetesClient.getNamespace(), clusterId, data); assertNotNull(kubernetesClient.configMaps().withName(name).get());