Skip to content

Commit bb58a0a

Browse files
authored
[FLINK-37193] Ingress recreated after perform a change to CR FlinkDeployment
1 parent 344cd9b commit bb58a0a

File tree

11 files changed

+282
-60
lines changed

11 files changed

+282
-60
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public class FlinkOperator {
9191
private final Configuration baseConfig;
9292

9393
public FlinkOperator(@Nullable Configuration conf) {
94+
this(conf, null);
95+
}
96+
97+
@VisibleForTesting
98+
FlinkOperator(@Nullable Configuration conf, KubernetesClient client) {
9499
this.configManager =
95100
conf != null
96101
? new FlinkConfigManager(conf) // For testing only
@@ -100,9 +105,13 @@ public FlinkOperator(@Nullable Configuration conf) {
100105

101106
baseConfig = configManager.getDefaultConfig();
102107
this.metricGroup = OperatorMetricUtils.initOperatorMetrics(baseConfig);
103-
this.client =
104-
KubernetesClientUtils.getKubernetesClient(
105-
configManager.getOperatorConfiguration(), this.metricGroup);
108+
if (client == null) {
109+
this.client =
110+
KubernetesClientUtils.getKubernetesClient(
111+
configManager.getOperatorConfiguration(), this.metricGroup);
112+
} else {
113+
this.client = client;
114+
}
106115
this.operator = createOperator();
107116
this.validators = ValidatorUtils.discoverValidators(configManager);
108117
this.listeners = ListenerUtils.discoverListeners(configManager);

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,15 +184,14 @@ public List<EventSource<?, FlinkDeployment>> prepareEventSources(
184184
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
185185
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
186186
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
187-
187+
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
188188
if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
189189
eventSources.add(
190190
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));
191191
} else {
192192
LOG.warn(
193193
"Could not initialize informer for snapshots as the CRD has not been installed!");
194194
}
195-
196195
return eventSources;
197196
}
198197

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,7 @@ public void deploy(
190190
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
191191
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
192192

193-
IngressUtils.updateIngressRules(
194-
relatedResource.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
193+
IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
195194
}
196195

197196
private void setJobIdIfNecessary(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ public void deploy(
106106
setOwnerReference(cr, deployConfig);
107107
ctx.getFlinkService().submitSessionCluster(deployConfig);
108108
cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
109-
IngressUtils.updateIngressRules(
110-
cr.getMetadata(), spec, deployConfig, ctx.getKubernetesClient());
109+
IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
111110
}
112111

113112
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventSourceUtils.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import io.fabric8.kubernetes.api.model.HasMetadata;
3232
import io.fabric8.kubernetes.api.model.apps.Deployment;
33+
import io.fabric8.kubernetes.api.model.networking.v1.Ingress;
3334
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
3435
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
3536
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -45,9 +46,13 @@
4546
import java.util.stream.Collectors;
4647
import java.util.stream.Stream;
4748

49+
import static org.apache.flink.kubernetes.operator.utils.IngressUtils.ingressInNetworkingV1;
50+
4851
/** Utility class to locate secondary resources. */
4952
public class EventSourceUtils {
5053

54+
public static final String LABEL_COMPONENT_INGRESS = "ingress";
55+
5156
private static final String FLINK_DEPLOYMENT_IDX = FlinkDeploymentController.class.getName();
5257
private static final String FLINK_SESSIONJOB_IDX = FlinkSessionJobController.class.getName();
5358
private static final String FLINK_STATE_SNAPSHOT_IDX = FlinkStateSnapshot.class.getName();
@@ -95,7 +100,28 @@ public static InformerEventSource<Deployment, FlinkDeployment> getDeploymentInfo
95100
var configuration =
96101
InformerEventSourceConfiguration.from(Deployment.class, FlinkDeployment.class)
97102
.withLabelSelector(labelSelector)
98-
.withSecondaryToPrimaryMapper(fromLabel(Constants.LABEL_APP_KEY))
103+
.withNamespacesInheritedFromController()
104+
.withFollowControllerNamespacesChanges(true)
105+
.build();
106+
107+
return new InformerEventSource<>(configuration, context);
108+
}
109+
110+
public static InformerEventSource<?, FlinkDeployment> getIngressInformerEventSource(
111+
EventSourceContext<FlinkDeployment> context) {
112+
final String labelSelector =
113+
Map.of(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS).entrySet().stream()
114+
.map(Object::toString)
115+
.collect(Collectors.joining(","));
116+
117+
var ingressClass =
118+
ingressInNetworkingV1(context.getClient())
119+
? Ingress.class
120+
: io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress.class;
121+
122+
var configuration =
123+
InformerEventSourceConfiguration.from(ingressClass, FlinkDeployment.class)
124+
.withLabelSelector(labelSelector)
99125
.withNamespacesInheritedFromController()
100126
.withFollowControllerNamespacesChanges(true)
101127
.build();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,24 @@
2020
import org.apache.flink.configuration.Configuration;
2121
import org.apache.flink.configuration.RestOptions;
2222
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
23+
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2324
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
25+
import org.apache.flink.kubernetes.utils.Constants;
2426
import org.apache.flink.util.Preconditions;
2527

2628
import io.fabric8.kubernetes.api.model.HasMetadata;
2729
import io.fabric8.kubernetes.api.model.IntOrString;
2830
import io.fabric8.kubernetes.api.model.ObjectMeta;
2931
import io.fabric8.kubernetes.api.model.OwnerReference;
3032
import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder;
31-
import io.fabric8.kubernetes.api.model.apps.Deployment;
3233
import io.fabric8.kubernetes.api.model.networking.v1.HTTPIngressRuleValueBuilder;
3334
import io.fabric8.kubernetes.api.model.networking.v1.IngressBuilder;
3435
import io.fabric8.kubernetes.api.model.networking.v1.IngressRule;
3536
import io.fabric8.kubernetes.api.model.networking.v1.IngressRuleBuilder;
37+
import io.fabric8.kubernetes.api.model.networking.v1beta1.Ingress;
3638
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
3739
import io.fabric8.kubernetes.client.KubernetesClient;
40+
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
3841
import org.apache.commons.lang3.StringUtils;
3942
import org.slf4j.Logger;
4043
import org.slf4j.LoggerFactory;
@@ -43,11 +46,15 @@
4346
import java.net.MalformedURLException;
4447
import java.net.URL;
4548
import java.util.Collections;
49+
import java.util.HashMap;
4650
import java.util.List;
51+
import java.util.Map;
4752
import java.util.Optional;
4853
import java.util.regex.Pattern;
4954
import java.util.stream.Collectors;
5055

56+
import static org.apache.flink.kubernetes.operator.utils.EventSourceUtils.LABEL_COMPONENT_INGRESS;
57+
5158
/** Ingress utilities. */
5259
public class IngressUtils {
5360

@@ -62,29 +69,32 @@ public class IngressUtils {
6269

6370
private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
6471

65-
public static void updateIngressRules(
66-
ObjectMeta objectMeta,
72+
public static void reconcileIngress(
73+
FlinkResourceContext<?> ctx,
6774
FlinkDeploymentSpec spec,
6875
Configuration effectiveConfig,
6976
KubernetesClient client) {
7077

78+
var objectMeta = ctx.getResource().getMetadata();
7179
if (spec.getIngress() != null) {
7280
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);
73-
74-
Deployment deployment =
75-
client.apps()
76-
.deployments()
77-
.inNamespace(objectMeta.getNamespace())
78-
.withName(objectMeta.getName())
79-
.get();
80-
if (deployment == null) {
81-
LOG.error("Could not find deployment {}", objectMeta.getName());
81+
setOwnerReference(ctx.getResource(), Collections.singletonList(ingress));
82+
LOG.info("Updating ingress rules {}", ingress);
83+
client.resource(ingress)
84+
.inNamespace(objectMeta.getNamespace())
85+
.createOr(NonDeletingOperation::update);
86+
} else {
87+
Optional<? extends HasMetadata> ingress;
88+
if (ingressInNetworkingV1(client)) {
89+
ingress =
90+
ctx.getJosdkContext()
91+
.getSecondaryResource(
92+
io.fabric8.kubernetes.api.model.networking.v1.Ingress
93+
.class);
8294
} else {
83-
setOwnerReference(deployment, Collections.singletonList(ingress));
95+
ingress = ctx.getJosdkContext().getSecondaryResource(Ingress.class);
8496
}
85-
86-
LOG.info("Updating ingress rules {}", ingress);
87-
client.resourceList(ingress).inNamespace(objectMeta.getNamespace()).createOrReplace();
97+
ingress.ifPresent(i -> ctx.getKubernetesClient().resource(i).delete());
8898
}
8999
}
90100

@@ -93,10 +103,15 @@ private static HasMetadata getIngress(
93103
FlinkDeploymentSpec spec,
94104
Configuration effectiveConfig,
95105
KubernetesClient client) {
106+
Map<String, String> labels =
107+
spec.getIngress().getLabels() == null
108+
? new HashMap<>()
109+
: new HashMap<>(spec.getIngress().getLabels());
110+
labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS);
96111
if (ingressInNetworkingV1(client)) {
97112
return new IngressBuilder()
98113
.withNewMetadata()
99-
.withLabels(spec.getIngress().getLabels())
114+
.withLabels(labels)
100115
.withAnnotations(spec.getIngress().getAnnotations())
101116
.withName(objectMeta.getName())
102117
.withNamespace(objectMeta.getNamespace())
@@ -128,7 +143,7 @@ private static HasMetadata getIngress(
128143
return new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
129144
.withNewMetadata()
130145
.withAnnotations(spec.getIngress().getAnnotations())
131-
.withLabels(spec.getIngress().getLabels())
146+
.withLabels(labels)
132147
.withName(objectMeta.getName())
133148
.withNamespace(objectMeta.getNamespace())
134149
.endMetadata()

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@
2121
import org.apache.flink.configuration.IllegalConfigurationException;
2222
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
2323

24+
import io.fabric8.kubeapitest.junit.EnableKubeAPIServer;
2425
import io.fabric8.kubernetes.client.Config;
26+
import io.fabric8.kubernetes.client.KubernetesClient;
2527
import io.javaoperatorsdk.operator.RegisteredController;
2628
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2729
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
2830
import org.junit.jupiter.api.Assertions;
29-
import org.junit.jupiter.api.BeforeAll;
3031
import org.junit.jupiter.api.Test;
3132

3233
import java.util.concurrent.ThreadPoolExecutor;
@@ -39,14 +40,13 @@
3940
* ConfigurationServiceProvider) we write multiple tests as a single function, please provide
4041
* ample comments.
4142
*/
42-
public class FlinkOperatorTest {
43-
@BeforeAll
44-
public static void setAutoTryKubeConfig() {
45-
System.setProperty(Config.KUBERNETES_AUTH_TRYKUBECONFIG_SYSTEM_PROPERTY, "false");
46-
}
43+
@EnableKubeAPIServer
44+
class FlinkOperatorTest {
45+
46+
static KubernetesClient kubernetesClient;
4747

4848
@Test
49-
public void testConfigurationPassedToJOSDK() {
49+
void testConfigurationPassedToJOSDK() {
5050
var testParallelism = 42;
5151
var testSelector = "flink=enabled";
5252
var testLeaseName = "test-lease";
@@ -64,7 +64,7 @@ public void testConfigurationPassedToJOSDK() {
6464
operatorConfig.set(
6565
KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_NAME, testLeaseName);
6666

67-
var testOperator = new FlinkOperator(operatorConfig);
67+
var testOperator = new FlinkOperator(operatorConfig, kubernetesClient);
6868
testOperator.registerDeploymentController();
6969
testOperator.registerSessionJobController();
7070

@@ -99,7 +99,7 @@ public void testConfigurationPassedToJOSDK() {
9999
}
100100

101101
@Test
102-
public void testLeaderElectionConfig() {
102+
void testLeaderElectionConfig() {
103103
var operatorConfig = new Configuration();
104104
operatorConfig.set(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_ENABLED, true);
105105

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED;
6969
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.SNAPSHOT_RESOURCE_ENABLED;
7070
import static org.apache.flink.kubernetes.operator.utils.EventRecorder.Reason.ValidationError;
71+
import static org.assertj.core.api.Assertions.assertThat;
7172
import static org.junit.jupiter.api.Assertions.assertEquals;
7273
import static org.junit.jupiter.api.Assertions.assertFalse;
7374
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -983,6 +984,9 @@ public void testIngressLifeCycle() throws Exception {
983984
appWithIngress.getMetadata().getName(),
984985
appWithIngress.getMetadata().getNamespace())
985986
.getHost());
987+
assertThat(ingress.getMetadata().getOwnerReferences()).hasSize(1);
988+
assertThat(ingress.getMetadata().getOwnerReferences().get(0).getKind())
989+
.isEqualTo(FlinkDeployment.class.getSimpleName());
986990
}
987991

988992
@Test

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconcilerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void setup() {
7575
public void testStartSession() throws Exception {
7676
var count = new AtomicInteger(0);
7777
flinkService =
78-
new TestingFlinkService() {
78+
new TestingFlinkService(kubernetesClient) {
7979
@Override
8080
public void submitSessionCluster(Configuration conf) throws Exception {
8181
super.submitSessionCluster(conf);

0 commit comments

Comments
 (0)