Skip to content

Commit 9c49fb2

Browse files
authored
[FLINK-37193] Feature flag if operator should manage ingress
1 parent 2b7a1d7 commit 9c49fb2

File tree

11 files changed

+122
-20
lines changed

11 files changed

+122
-20
lines changed

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@
170170
<td>Integer</td>
171171
<td>The port the health probe will use to expose the status.</td>
172172
</tr>
173+
<tr>
174+
<td><h5>kubernetes.operator.ingress.manage</h5></td>
175+
<td style="word-wrap: break-word;">true</td>
176+
<td>Boolean</td>
177+
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
178+
</tr>
173179
<tr>
174180
<td><h5>kubernetes.operator.jm-deployment-recovery.enabled</h5></td>
175181
<td style="word-wrap: break-word;">true</td>

docs/layouts/shortcodes/generated/system_advanced_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@
6868
<td>Integer</td>
6969
<td>The port the health probe will use to expose the status.</td>
7070
</tr>
71+
<tr>
72+
<td><h5>kubernetes.operator.ingress.manage</h5></td>
73+
<td style="word-wrap: break-word;">true</td>
74+
<td>Boolean</td>
75+
<td>Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.</td>
76+
</tr>
7177
<tr>
7278
<td><h5>kubernetes.operator.label.selector</h5></td>
7379
<td style="word-wrap: break-word;">(none)</td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ void registerDeploymentController() {
199199
observerFactory,
200200
statusRecorder,
201201
eventRecorder,
202-
canaryResourceManager);
202+
canaryResourceManager,
203+
configManager);
203204
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
204205
}
205206

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration {
7979
Duration slowRequestThreshold;
8080
int reportedExceptionEventsMaxCount;
8181
int reportedExceptionEventsMaxStackTraceLength;
82+
boolean manageIngress;
8283

8384
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8485
Duration reconcileInterval =
@@ -203,6 +204,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
203204
operatorConfig.get(
204205
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
205206

207+
boolean manageIngress =
208+
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);
209+
206210
return new FlinkOperatorConfiguration(
207211
reconcileInterval,
208212
reconcilerMaxParallelism,
@@ -234,7 +238,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
234238
snapshotResourcesEnabled,
235239
slowRequestThreshold,
236240
reportedExceptionEventsMaxCount,
237-
reportedExceptionEventsMaxStackTraceLength);
241+
reportedExceptionEventsMaxStackTraceLength,
242+
manageIngress);
238243
}
239244

240245
private static GenericRetry getRetryConfig(Configuration conf) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,4 +679,12 @@ public static String operatorConfigKey(String key) {
679679
.defaultValue(10)
680680
.withDescription(
681681
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
682+
683+
@Documentation.Section(SECTION_ADVANCED)
684+
public static final ConfigOption<Boolean> OPERATOR_MANAGE_INGRESS =
685+
operatorConfig("ingress.manage")
686+
.booleanType()
687+
.defaultValue(true)
688+
.withDescription(
689+
"Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.");
682690
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2424
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2525
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
26+
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2627
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
2728
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
2829
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
@@ -69,6 +70,7 @@ public class FlinkDeploymentController
6970
private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
7071
private final EventRecorder eventRecorder;
7172
private final CanaryResourceManager<FlinkDeployment> canaryResourceManager;
73+
private final FlinkConfigManager flinkConfigManager;
7274

7375
public FlinkDeploymentController(
7476
Set<FlinkResourceValidator> validators,
@@ -77,14 +79,16 @@ public FlinkDeploymentController(
7779
FlinkDeploymentObserverFactory observerFactory,
7880
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
7981
EventRecorder eventRecorder,
80-
CanaryResourceManager<FlinkDeployment> canaryResourceManager) {
82+
CanaryResourceManager<FlinkDeployment> canaryResourceManager,
83+
FlinkConfigManager flinkConfigManager) {
8184
this.validators = validators;
8285
this.ctxFactory = ctxFactory;
8386
this.reconcilerFactory = reconcilerFactory;
8487
this.observerFactory = observerFactory;
8588
this.statusRecorder = statusRecorder;
8689
this.eventRecorder = eventRecorder;
8790
this.canaryResourceManager = canaryResourceManager;
91+
this.flinkConfigManager = flinkConfigManager;
8892
}
8993

9094
@Override
@@ -184,7 +188,9 @@ public List<EventSource<?, FlinkDeployment>> prepareEventSources(
184188
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
185189
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
186190
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
187-
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
191+
if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
192+
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
193+
}
188194
if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
189195
eventSources.add(
190196
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,8 @@ public void deploy(
190190
status.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING);
191191
status.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
192192

193-
IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
193+
IngressUtils.reconcileIngress(
194+
ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder);
194195
}
195196

196197
private void setJobIdIfNecessary(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ public void deploy(
106106
setOwnerReference(cr, deployConfig);
107107
ctx.getFlinkService().submitSessionCluster(deployConfig);
108108
cr.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
109-
IngressUtils.reconcileIngress(ctx, spec, deployConfig, ctx.getKubernetesClient());
109+
IngressUtils.reconcileIngress(
110+
ctx, spec, deployConfig, ctx.getKubernetesClient(), eventRecorder);
110111
}
111112

112113
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
3939
import io.fabric8.kubernetes.client.KubernetesClient;
4040
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
41+
import lombok.extern.slf4j.Slf4j;
4142
import org.apache.commons.lang3.StringUtils;
4243
import org.slf4j.Logger;
4344
import org.slf4j.LoggerFactory;
@@ -56,6 +57,7 @@
5657
import static org.apache.flink.kubernetes.operator.utils.EventSourceUtils.LABEL_COMPONENT_INGRESS;
5758

5859
/** Ingress utilities. */
60+
@Slf4j
5961
public class IngressUtils {
6062

6163
private static final Pattern NAME_PTN =
@@ -68,13 +70,29 @@ public class IngressUtils {
6870
private static final String REST_SVC_NAME_SUFFIX = "-rest";
6971

7072
private static final Logger LOG = LoggerFactory.getLogger(IngressUtils.class);
73+
public static final String INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET =
74+
"Ingress management is turned off but ingress set in spec";
75+
public static final String INGRESS_MANAGEMENT = "IngressManagement";
7176

7277
public static void reconcileIngress(
7378
FlinkResourceContext<?> ctx,
7479
FlinkDeploymentSpec spec,
7580
Configuration effectiveConfig,
76-
KubernetesClient client) {
81+
KubernetesClient client,
82+
EventRecorder eventRecorder) {
83+
if (!ctx.getOperatorConfig().isManageIngress()) {
84+
if (spec.getIngress() != null) {
85+
eventRecorder.triggerEvent(
86+
ctx.getResource(),
87+
EventRecorder.Type.Warning,
88+
INGRESS_MANAGEMENT,
89+
INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET,
90+
EventRecorder.Component.Operator,
91+
client);
92+
}
7793

94+
return;
95+
}
7896
var objectMeta = ctx.getResource().getMetadata();
7997
if (spec.getIngress() != null) {
8098
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public TestingFlinkDeploymentController(
110110
new FlinkDeploymentObserverFactory(eventRecorder),
111111
statusRecorder,
112112
eventRecorder,
113-
canaryResourceManager);
113+
canaryResourceManager,
114+
new FlinkConfigManager(Configuration.fromMap(Map.of())));
114115
}
115116

116117
@Override

0 commit comments

Comments
 (0)