|
27 | 27 | import org.apache.flink.configuration.TaskManagerOptions; |
28 | 28 | import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; |
29 | 29 | import org.apache.flink.kubernetes.operator.TestUtils; |
| 30 | +import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore; |
30 | 31 | import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; |
31 | 32 | import org.apache.flink.kubernetes.utils.Constants; |
32 | 33 | import org.apache.flink.kubernetes.utils.KubernetesUtils; |
33 | 34 | import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; |
34 | 35 |
|
35 | | -import io.fabric8.kubernetes.api.model.ConfigMap; |
36 | 36 | import io.fabric8.kubernetes.api.model.ConfigMapBuilder; |
37 | 37 | import io.fabric8.kubernetes.api.model.Container; |
38 | 38 | import io.fabric8.kubernetes.api.model.EmptyDirVolumeSource; |
|
49 | 49 | import io.fabric8.kubernetes.client.KubernetesClient; |
50 | 50 | import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; |
51 | 51 | import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer; |
| 52 | +import io.javaoperatorsdk.operator.processing.event.ResourceID; |
52 | 53 | import org.junit.jupiter.api.Test; |
53 | 54 |
|
54 | 55 | import java.net.HttpURLConnection; |
@@ -403,21 +404,39 @@ public void testMergePodUsingArrayName() { |
403 | 404 | assertEquals(List.of(v1merged, volume2, volume3), mergedPod.getSpec().getVolumes()); |
404 | 405 | } |
405 | 406 |
|
| 407 | + @Test |
| 408 | + public void testKubernetesHaMetaDeletion() { |
| 409 | + var clusterId = "cluster-id"; |
| 410 | + var ns = kubernetesClient.getNamespace(); |
| 411 | + createHAConfigMapWithData("test", ns, clusterId, Map.of("k", "v")); |
| 412 | + |
| 413 | + // Create autoscaler configmap to test that it's not deleted |
| 414 | + var autoscalerData = |
| 415 | + new ConfigMapBuilder() |
| 416 | + .withNewMetadata() |
| 417 | + .withName("autoscaler-cm") |
| 418 | + .withNamespace(ns) |
| 419 | + .withLabels(ConfigMapStore.getAutoscalerCmLabels(new ResourceID(clusterId))) |
| 420 | + .endMetadata() |
| 421 | + .withData(Map.of("a", "s")) |
| 422 | + .build(); |
| 423 | + kubernetesClient.resource(autoscalerData).create(); |
| 424 | + FlinkUtils.deleteKubernetesHAMetadata(clusterId, ns, kubernetesClient); |
| 425 | + assertNotNull(kubernetesClient.resource(autoscalerData).get()); |
| 426 | + } |
| 427 | + |
406 | 428 | private void createHAConfigMapWithData( |
407 | 429 | String configMapName, String namespace, String clusterId, Map<String, String> data) { |
408 | | - final ConfigMap kubernetesConfigMap = |
| 430 | + var cm = |
409 | 431 | new ConfigMapBuilder() |
410 | 432 | .withNewMetadata() |
411 | 433 | .withName(configMapName) |
412 | 434 | .withNamespace(namespace) |
413 | | - .withLabels( |
414 | | - KubernetesUtils.getConfigMapLabels( |
415 | | - clusterId, |
416 | | - Constants.LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY)) |
| 435 | + .withLabels(KubernetesUtils.getCommonLabels(clusterId)) |
417 | 436 | .endMetadata() |
418 | 437 | .withData(data) |
419 | 438 | .build(); |
420 | 439 |
|
421 | | - kubernetesClient.configMaps().resource(kubernetesConfigMap).createOrReplace(); |
| 440 | + kubernetesClient.configMaps().resource(cm).createOrReplace(); |
422 | 441 | } |
423 | 442 | } |
0 commit comments