2525import org .apache .flink .kubernetes .configuration .KubernetesConfigOptions ;
2626import org .apache .flink .kubernetes .highavailability .KubernetesHaServicesFactory ;
2727import org .apache .flink .kubernetes .kubeclient .parameters .KubernetesJobManagerParameters ;
28+ import org .apache .flink .kubernetes .operator .autoscaler .state .ConfigMapStore ;
2829import org .apache .flink .kubernetes .operator .reconciler .ReconciliationUtils ;
2930import org .apache .flink .kubernetes .utils .Constants ;
3031import org .apache .flink .kubernetes .utils .KubernetesUtils ;
4748import io .fabric8 .kubernetes .api .model .apps .Deployment ;
4849import io .fabric8 .kubernetes .api .model .apps .DeploymentCondition ;
4950import io .fabric8 .kubernetes .client .KubernetesClient ;
51+ import io .fabric8 .kubernetes .client .dsl .FilterWatchListDeletable ;
52+ import io .fabric8 .kubernetes .client .dsl .Resource ;
5053import io .javaoperatorsdk .operator .api .reconciler .Context ;
5154import org .slf4j .Logger ;
5255import org .slf4j .LoggerFactory ;
6063import java .util .Optional ;
6164import java .util .function .Predicate ;
6265
63- import static org .apache .flink .kubernetes .utils .Constants .LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY ;
64-
6566/** Flink Utility methods used by the operator. */
6667public class FlinkUtils {
6768
@@ -214,13 +215,20 @@ public static void deleteZookeeperHAMetadata(Configuration conf) {
214215
215216 public static void deleteKubernetesHAMetadata (
216217 String clusterId , String namespace , KubernetesClient kubernetesClient ) {
217- kubernetesClient
218+ getFlinkKubernetesHaConfigmaps (clusterId , namespace , kubernetesClient ).delete ();
219+ }
220+
221+ private static FilterWatchListDeletable <ConfigMap , ConfigMapList , Resource <ConfigMap >>
222+ getFlinkKubernetesHaConfigmaps (
223+ String clusterId , String namespace , KubernetesClient kubernetesClient ) {
224+ return kubernetesClient
218225 .configMaps ()
219226 .inNamespace (namespace )
220- .withLabels (
221- KubernetesUtils .getConfigMapLabels (
222- clusterId , LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY ))
223- .delete ();
227+ .withNewFilter ()
228+ .withLabels (KubernetesUtils .getCommonLabels (clusterId ))
229+ .withoutLabel (
230+ Constants .LABEL_COMPONENT_KEY , ConfigMapStore .LABEL_COMPONENT_AUTOSCALER )
231+ .endFilter ();
224232 }
225233
226234 public static void deleteJobGraphInZookeeperHA (Configuration conf ) throws Exception {
@@ -233,16 +241,8 @@ public static void deleteJobGraphInZookeeperHA(Configuration conf) throws Except
233241
234242 public static void deleteJobGraphInKubernetesHA (
235243 String clusterId , String namespace , KubernetesClient kubernetesClient ) {
236- // The HA ConfigMap names have been changed from 1.15, so we use the labels to filter out
237- // them and delete job graph key
238- final Map <String , String > haConfigMapLabels =
239- KubernetesUtils .getConfigMapLabels (
240- clusterId , Constants .LABEL_CONFIGMAP_TYPE_HIGH_AVAILABILITY );
241- final ConfigMapList configMaps =
242- kubernetesClient
243- .configMaps ()
244- .inNamespace (namespace )
245- .withLabels (haConfigMapLabels )
244+ var configMaps =
245+ FlinkUtils .getFlinkKubernetesHaConfigmaps (clusterId , namespace , kubernetesClient )
246246 .list ();
247247
248248 boolean shouldUpdate = false ;
@@ -303,14 +303,11 @@ private static boolean isKubernetesHaMetadataAvailable(
303303 KubernetesClient kubernetesClient ,
304304 Predicate <ConfigMap > cmPredicate ) {
305305
306- String clusterId = conf .get (KubernetesConfigOptions .CLUSTER_ID );
307- String namespace = conf .get (KubernetesConfigOptions .NAMESPACE );
308-
309306 var configMaps =
310- kubernetesClient
311- . configMaps ()
312- . inNamespace ( namespace )
313- . withLabels ( KubernetesUtils . getCommonLabels ( clusterId ) )
307+ FlinkUtils . getFlinkKubernetesHaConfigmaps (
308+ conf . get ( KubernetesConfigOptions . CLUSTER_ID ),
309+ conf . get ( KubernetesConfigOptions . NAMESPACE ),
310+ kubernetesClient )
314311 .list ()
315312 .getItems ();
316313
0 commit comments