Skip to content

Commit e2e5e04

Browse files
committed
Feature flag for managing ingress
Signed-off-by: Attila Mészáros <[email protected]>
1 parent d9fa479 commit e2e5e04

File tree

7 files changed

+74
-7
lines changed

7 files changed

+74
-7
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,8 @@ void registerDeploymentController() {
199199
observerFactory,
200200
statusRecorder,
201201
eventRecorder,
202-
canaryResourceManager);
202+
canaryResourceManager,
203+
configManager);
203204
registeredControllers.add(operator.register(controller, this::overrideControllerConfigs));
204205
}
205206

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class FlinkOperatorConfiguration {
7979
Duration slowRequestThreshold;
8080
int reportedExceptionEventsMaxCount;
8181
int reportedExceptionEventsMaxStackTraceLength;
82+
boolean manageIngress;
8283

8384
public static FlinkOperatorConfiguration fromConfiguration(Configuration operatorConfig) {
8485
Duration reconcileInterval =
@@ -203,6 +204,9 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
203204
operatorConfig.get(
204205
KubernetesOperatorConfigOptions.OPERATOR_EVENT_EXCEPTION_STACKTRACE_LINES);
205206

207+
boolean manageIngress =
208+
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS);
209+
206210
return new FlinkOperatorConfiguration(
207211
reconcileInterval,
208212
reconcilerMaxParallelism,
@@ -234,7 +238,8 @@ public static FlinkOperatorConfiguration fromConfiguration(Configuration operato
234238
snapshotResourcesEnabled,
235239
slowRequestThreshold,
236240
reportedExceptionEventsMaxCount,
237-
reportedExceptionEventsMaxStackTraceLength);
241+
reportedExceptionEventsMaxStackTraceLength,
242+
manageIngress);
238243
}
239244

240245
private static GenericRetry getRetryConfig(Configuration conf) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,4 +679,12 @@ public static String operatorConfigKey(String key) {
679679
.defaultValue(10)
680680
.withDescription(
681681
"Maximum number of exception-related Kubernetes events emitted per reconciliation cycle.");
682+
683+
@Documentation.Section(SECTION_ADVANCED)
684+
public static final ConfigOption<Boolean> OPERATOR_MANAGE_INGRESS =
685+
operatorConfig("ingress.manage")
686+
.booleanType()
687+
.defaultValue(true)
688+
.withDescription(
689+
"Feature flag if operator will manage the Ingress resource. If false, no InformerEventSource will be registered for Ingress, and Ingress won't be created.");
682690
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2424
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
2525
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
26+
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
2627
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
2728
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
2829
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
@@ -69,6 +70,7 @@ public class FlinkDeploymentController
6970
private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
7071
private final EventRecorder eventRecorder;
7172
private final CanaryResourceManager<FlinkDeployment> canaryResourceManager;
73+
private final FlinkConfigManager flinkConfigManager;
7274

7375
public FlinkDeploymentController(
7476
Set<FlinkResourceValidator> validators,
@@ -77,14 +79,16 @@ public FlinkDeploymentController(
7779
FlinkDeploymentObserverFactory observerFactory,
7880
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
7981
EventRecorder eventRecorder,
80-
CanaryResourceManager<FlinkDeployment> canaryResourceManager) {
82+
CanaryResourceManager<FlinkDeployment> canaryResourceManager,
83+
FlinkConfigManager flinkConfigManager) {
8184
this.validators = validators;
8285
this.ctxFactory = ctxFactory;
8386
this.reconcilerFactory = reconcilerFactory;
8487
this.observerFactory = observerFactory;
8588
this.statusRecorder = statusRecorder;
8689
this.eventRecorder = eventRecorder;
8790
this.canaryResourceManager = canaryResourceManager;
91+
this.flinkConfigManager = flinkConfigManager;
8892
}
8993

9094
@Override
@@ -184,7 +188,9 @@ public List<EventSource<?, FlinkDeployment>> prepareEventSources(
184188
List<EventSource<?, FlinkDeployment>> eventSources = new ArrayList<>();
185189
eventSources.add(EventSourceUtils.getSessionJobInformerEventSource(context));
186190
eventSources.add(EventSourceUtils.getDeploymentInformerEventSource(context));
187-
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
191+
if (flinkConfigManager.getOperatorConfiguration().isManageIngress()) {
192+
eventSources.add(EventSourceUtils.getIngressInformerEventSource(context));
193+
}
188194
if (KubernetesClientUtils.isCrdInstalled(FlinkStateSnapshot.class)) {
189195
eventSources.add(
190196
EventSourceUtils.getStateSnapshotForFlinkResourceInformerEventSource(context));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,9 @@ public static void reconcileIngress(
7474
FlinkDeploymentSpec spec,
7575
Configuration effectiveConfig,
7676
KubernetesClient client) {
77-
77+
if (!ctx.getOperatorConfig().isManageIngress()) {
78+
return;
79+
}
7880
var objectMeta = ctx.getResource().getMetadata();
7981
if (spec.getIngress() != null) {
8082
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public TestingFlinkDeploymentController(
110110
new FlinkDeploymentObserverFactory(eventRecorder),
111111
statusRecorder,
112112
eventRecorder,
113-
canaryResourceManager);
113+
canaryResourceManager,
114+
new FlinkConfigManager(Configuration.fromMap(Map.of())));
114115
}
115116

116117
@Override

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/IngressUtilsTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838

3939
import java.net.URL;
4040
import java.util.ArrayList;
41+
import java.util.HashMap;
4142
import java.util.List;
4243
import java.util.Map;
4344

45+
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_MANAGE_INGRESS;
4446
import static org.assertj.core.api.Assertions.assertThat;
4547
import static org.junit.jupiter.api.Assertions.assertEquals;
4648
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -57,7 +59,20 @@ class IngressUtilsTest {
5759

5860
private FlinkResourceContext<?> createResourceContext(FlinkDeployment appCluster) {
5961
testingJosdkContext = new TestingJosdkContext<>(client);
60-
return new FlinkDeploymentContext(appCluster, testingJosdkContext, null, null, null, null);
62+
return new FlinkDeploymentContext(
63+
appCluster,
64+
testingJosdkContext,
65+
null,
66+
new FlinkConfigManager(Configuration.fromMap(new HashMap<>())),
67+
null,
68+
null);
69+
}
70+
71+
private FlinkResourceContext<?> createResourceContext(
72+
FlinkDeployment appCluster, FlinkConfigManager configManager) {
73+
testingJosdkContext = new TestingJosdkContext<>(client);
74+
return new FlinkDeploymentContext(
75+
appCluster, testingJosdkContext, null, configManager, null, null);
6176
}
6277

6378
@Test
@@ -560,4 +575,33 @@ void testDeletesIngress() {
560575
.get();
561576
assertThat(ingressV1beta1).isNull();
562577
}
578+
579+
@Test
580+
void skipIngressReconciliationIfFeatureFlagOff() {
581+
FlinkDeployment appCluster = TestUtils.buildApplicationCluster();
582+
FlinkConfigManager manager =
583+
new FlinkConfigManager(
584+
Configuration.fromMap(Map.of(OPERATOR_MANAGE_INGRESS.key(), "false")));
585+
var context = createResourceContext(appCluster, manager);
586+
context.getOperatorConfig();
587+
Configuration config =
588+
new FlinkConfigManager(new Configuration())
589+
.getDeployConfig(appCluster.getMetadata(), appCluster.getSpec());
590+
591+
IngressSpec.IngressSpecBuilder builder = IngressSpec.builder();
592+
builder.template("{{name}}.{{namespace}}.example.com");
593+
builder.tls(new ArrayList<>());
594+
appCluster.getSpec().setIngress(builder.build());
595+
596+
IngressUtils.reconcileIngress(context, appCluster.getSpec(), config, client);
597+
598+
var ingressV1beta1 =
599+
client.network()
600+
.v1beta1()
601+
.ingresses()
602+
.inNamespace(appCluster.getMetadata().getNamespace())
603+
.withName(appCluster.getMetadata().getName())
604+
.get();
605+
assertThat(ingressV1beta1).isNull();
606+
}
563607
}

0 commit comments

Comments
 (0)