Skip to content

Commit bbb4f83

Browse files
committed
[FLINK-37193] Ingress recreated after perform a change to CR FlinkDeployment
In this alternative ingress is bound to the CR not to job, for proper delete added informer Signed-off-by: Attila Mészáros <[email protected]>
1 parent d5f4753 commit bbb4f83

File tree

6 files changed

+548
-512
lines changed

6 files changed

+548
-512
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,14 @@ public List<EventSource<?, FlinkDeployment>> prepareEventSources(
184184
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
185185
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
186186
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
187-
187+
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
188188
if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
189189
eventSources.add(
190190
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
191191
} else {
192192
LOG.warn(
193193
"Could not initialize informer for snapshots as the CRD has not been installed!");
194194
}
195-
196195
return eventSources;
197196
}
198197

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ public void deploy(
190190
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
191191
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
192192

193-
IngressUtils.updateIngressRules(
194-
relatedResource.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
193+
IngressUtils.updateIngressRules(ctx, spec, deployConfig, ctx.getKubernetesClient(), true);
195194
}
196195

197196
private void setJobIdIfNecessary(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ public void deploy(
106106
setOwnerReference(cr, deployConfig);
107107
ctx.getFlinkService().submitSessionCluster(deployConfig);
108108
cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
109-
IngressUtils.updateIngressRules(
110-
cr.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
109+
IngressUtils.updateIngressRules(ctx, spec, deployConfig, ctx.getKubernetesClient(), false);
111110
}
112111

113112
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils;
1919

20+
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
2021
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2122
import org.apache.flink.kubernetes.operator.api.CrdConstants;
2223
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -45,6 +46,8 @@
4546
import java.util.stream.Collectors;
4647
import java.util.stream.Stream;
4748

49+
import static org.apache.flink.kubernetes.operator.utils.IngressUtils.ingressInNetworkingV1;
50+
4851
/** Utility class to locate secondary resources. */
4952
public class EventSourceUtils {
5053

@@ -95,7 +98,31 @@ public static InformerEventSource<Deployment, FlinkDeployment> getDeploymentInfo
9598
var configuration =
9699
InformerEventSourceConfiguration.from(Deployment.class, FlinkDeployment.class)
97100
.withLabelSelector(labelSelector)
98-
.withSecondaryToPrimaryMapper(fromLabel(Constants.LABEL_APP_KEY))
101+
.withNamespacesInheritedFromController()
102+
.withFollowControllerNamespacesChanges(true)
103+
.build();
104+
105+
return new InformerEventSource<>(configuration, context);
106+
}
107+
108+
public static InformerEventSource<?, FlinkDeployment> getIngressInformerEventSource(
109+
EventSourceContext<FlinkDeployment> context) {
110+
// final String labelSelector =
111+
// Map.of(Constants.LABEL_COMPONENT_KEY,
112+
// Constants.LABEL_COMPONENT_JOB_MANAGER)
113+
// .entrySet()
114+
// .stream()
115+
// .map(Object::toString)
116+
// .collect(Collectors.joining(","));
117+
118+
var ingressClass =
119+
ingressInNetworkingV1(context.getClient())
120+
? Ingress.class
121+
: io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class;
122+
123+
var configuration =
124+
InformerEventSourceConfiguration.from(ingressClass, FlinkDeployment.class)
125+
// .withLabelSelector(labelSelector)
99126
.withNamespacesInheritedFromController()
100127
.withFollowControllerNamespacesChanges(true)
101128
.build();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.configuration.Configuration;
2121
import org.apache.flink.configuration.RestOptions;
2222
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
23+
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2324
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
2425
import org.apache.flink.util.Preconditions;
2526

@@ -33,8 +34,10 @@
3334
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
3435
import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
3536
import io.fabric8.kubernetes.api.model.networking.v1.IngressRuleBuilder;
37+
import io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress;
3638
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
3739
import io.fabric8.kubernetes.client.KubernetesClient;
40+
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
3841
import org.apache.commons.lang3.StringUtils;
3942
import org.slf4j.Logger;
4043
import org.slf4j.LoggerFactory;
@@ -63,28 +66,47 @@ public class IngressUtils {
6366
private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
6467

6568
public static void updateIngressRules(
66-
ObjectMeta objectMeta,
69+
FlinkResourceContext<?> ctx,
6770
FlinkDeploymentSpec spec,
6871
Configuration effectiveConfig,
69-
KubernetesClient client) {
72+
KubernetesClient client,
73+
boolean usePrimaryAsOwner) {
7074

75+
var objectMeta = ctx.getResource().getMetadata();
7176
if (spec.getIngress() != null) {
7277
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);
73-
74-
Deployment deployment =
75-
client.apps()
76-
.deployments()
77-
.inNamespace(objectMeta.getNamespace())
78-
.withName(objectMeta.getName())
79-
.get();
80-
if (deployment == null) {
81-
LOG.error("Could not find deployment {}", objectMeta.getName());
78+
if (usePrimaryAsOwner) {
79+
setOwnerReference(ctx.getResource(), Collections.singletonList(ingress));
8280
} else {
83-
setOwnerReference(deployment, Collections.singletonList(ingress));
81+
Deployment deployment =
82+
client.apps()
83+
.deployments()
84+
.inNamespace(objectMeta.getNamespace())
85+
.withName(objectMeta.getName())
86+
.get();
87+
if (deployment == null) {
88+
LOG.error("Could not find deployment {}", objectMeta.getName());
89+
} else {
90+
setOwnerReference(deployment, Collections.singletonList(ingress));
91+
}
8492
}
8593

8694
LOG.info("Updating ingress rules {}", ingress);
87-
client.resourceList(ingress).inNamespace(objectMeta.getNamespace()).createOrReplace();
95+
client.resource(ingress)
96+
.inNamespace(objectMeta.getNamespace())
97+
.createOr(NonDeletingOperation::update);
98+
} else if (usePrimaryAsOwner) {
99+
Optional<? extends HasMetadata> ingress;
100+
if (ingressInNetworkingV1(client)) {
101+
ingress =
102+
ctx.getJosdkContext()
103+
.getSecondaryResource(
104+
io.fabric8.kubernetes.api.model.networking.v1.Ingress
105+
.class);
106+
} else {
107+
ingress = ctx.getJosdkContext().getSecondaryResource(Ingress.class);
108+
}
109+
ingress.ifPresent(i -> ctx.getKubernetesClient().resource(i).delete());
88110
}
89111
}
90112

0 commit comments

Comments
 (0)