diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 003b6495b0..98d18e7d65 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | Parameter | Type | Docs | | ----------| ---- | ---- | | configuration | java.util.Map | | +| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | | | template | org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec | | ### FlinkDeploymentSpec diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java index 704d354152..a515b0a91b 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java @@ -25,6 +25,8 @@ import lombok.Data; import lombok.NoArgsConstructor; +import javax.annotation.Nullable; + import java.util.Map; /** Spec that describes a Flink application with blue/green deployment capabilities. */ @@ -38,5 +40,7 @@ public class FlinkBlueGreenDeploymentSpec { @JsonProperty("configuration") private Map configuration; + @Nullable private IngressSpec ingress; + private FlinkDeploymentTemplateSpec template; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java index 85de365b97..e302a18887 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java @@ -18,6 +18,7 @@ package org.apache.flink.kubernetes.operator.controller.bluegreen; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType; @@ -29,6 +30,7 @@ import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.utils.IngressUtils; import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils; import org.apache.flink.util.Preconditions; @@ -605,9 +607,56 @@ public UpdateControl finalizeBlueGreenDeployment( context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0)); context.getDeploymentStatus().setSavepointTriggerId(null); + updateBlueGreenIngress(context, nextState); return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null); } + /** + * Updates the ingress for Blue/Green deployment, pointing to the active deployment. + * + * @param context the Blue/Green context + * @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active + */ + public void updateBlueGreenIngress( + BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) { + + var bgDeployment = context.getBgDeployment(); + var bgSpec = bgDeployment.getSpec(); + + if (bgSpec.getIngress() == null) { + // No ingress configured, nothing to do + return; + } + + FlinkDeployment activeDeployment; + String serviceName; + switch (nextState) { + case ACTIVE_BLUE: + activeDeployment = context.getBlueDeployment(); + break; + case ACTIVE_GREEN: + activeDeployment = context.getGreenDeployment(); + break; + default: + LOG.debug("Skipping ingress reconciliation for non-active state: {}", nextState); + return; + } + + // Create a FlinkResourceContext for the active deployment to get proper config + FlinkResourceContext ctx = + context.getCtxFactory() + .getResourceContext(activeDeployment, context.getJosdkContext()); + // Get the deployment configuration (includes REST port and all Flink settings) + Configuration deployConfig = ctx.getDeployConfig(activeDeployment.getSpec()); + + // Call IngressUtils to reconcile the ingress pointing to the active service + IngressUtils.reconcileBlueGreenIngress( + context, + activeDeployment.getMetadata().getName(), + deployConfig, + context.getJosdkContext()); + } + // ==================== Common Utility Methods ==================== public static UpdateControl patchStatusUpdateControl( diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java index 722270a1f6..a84491b386 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java @@ -19,8 +19,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.RestOptions; +import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment; import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec; +import org.apache.flink.kubernetes.operator.api.spec.IngressSpec; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; +import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext; import org.apache.flink.kubernetes.operator.exception.ReconciliationException; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.util.Preconditions; @@ -38,6 +41,7 @@ import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.dsl.NonDeletingOperation; +import io.javaoperatorsdk.operator.api.reconciler.Context; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -95,7 +99,8 @@ public static void reconcileIngress( } var objectMeta = ctx.getResource().getMetadata(); if (spec.getIngress() != null) { - HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client); + HasMetadata ingress = + getIngress(objectMeta, spec.getIngress(), effectiveConfig, client); setOwnerReference(ctx.getResource(), Collections.singletonList(ingress)); LOG.info("Updating ingress rules {}", ingress); client.resource(ingress) @@ -116,33 +121,95 @@ public static void reconcileIngress( } } + public static void reconcileBlueGreenIngress( + BlueGreenContext context, + String targetDeploymentName, + Configuration effectiveConfig, + Context client) { + // todo see if I need to find way to cover this using this blueGreen method + // todo see if eventRecorder is strictly required (would be nice to have) + // if (!ctx.getOperatorConfig().isManageIngress()) { + // if (spec.getIngress() != null) { + // eventRecorder.triggerEvent( + // ctx.getResource(), + // EventRecorder.Type.Warning, + // INGRESS_MANAGEMENT, + // INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET, + // EventRecorder.Component.Operator, + // client); + // } + // + // return; + // } + var flinkBlueGreenDeploymentSpec = context.getBgDeployment().getSpec(); + var objectMeta = context.getBgDeployment().getMetadata(); + if (flinkBlueGreenDeploymentSpec.getIngress() != null) { + HasMetadata ingress = + getIngress( + objectMeta, + flinkBlueGreenDeploymentSpec.getIngress(), + effectiveConfig, + client.getClient(), + targetDeploymentName + REST_SVC_NAME_SUFFIX); + setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress)); + LOG.info("BLUE GREEN Updating ingress rules {}", ingress); + client.getClient() + .resource(ingress) + .inNamespace(objectMeta.getNamespace()) + .createOr(NonDeletingOperation::update); + } else { + LOG.info("BLUE GREEN NOTNOTNOT Updating ingress rules "); + Optional ingress; + if (ingressInNetworkingV1(client.getClient())) { + ingress = + client.getSecondaryResource( + io.fabric8.kubernetes.api.model.networking.v1.Ingress.class); + } else { + ingress = client.getSecondaryResource(Ingress.class); + } + ingress.ifPresent(i -> client.getClient().resource(i).delete()); + } + } + private static HasMetadata getIngress( ObjectMeta objectMeta, - FlinkDeploymentSpec spec, + IngressSpec spec, Configuration effectiveConfig, KubernetesClient client) { + return getIngress( + objectMeta, + spec, + effectiveConfig, + client, + objectMeta.getName() + REST_SVC_NAME_SUFFIX); + } + + private static HasMetadata getIngress( + ObjectMeta objectMeta, + IngressSpec spec, + Configuration effectiveConfig, + KubernetesClient client, + String serviceName) { Map labels = - spec.getIngress().getLabels() == null - ? new HashMap<>() - : new HashMap<>(spec.getIngress().getLabels()); + spec.getLabels() == null ? new HashMap<>() : new HashMap<>(spec.getLabels()); labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS); if (ingressInNetworkingV1(client)) { return new IngressBuilder() .withNewMetadata() .withLabels(labels) - .withAnnotations(spec.getIngress().getAnnotations()) + .withAnnotations(spec.getAnnotations()) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() - .withIngressClassName(spec.getIngress().getClassName()) - .withTls(spec.getIngress().getTls()) - .withRules(getIngressRule(objectMeta, spec, effectiveConfig)) + .withIngressClassName(spec.getClassName()) + .withTls(spec.getTls()) + .withRules(getIngressRule(objectMeta, spec, effectiveConfig, serviceName)) .endSpec() .build(); } else { List ingressTLS = - Optional.ofNullable(spec.getIngress().getTls()) + Optional.ofNullable(spec.getTls()) .map( list -> list.stream() @@ -160,30 +227,33 @@ private static HasMetadata getIngress( .orElse(Collections.emptyList()); return new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder() .withNewMetadata() - .withAnnotations(spec.getIngress().getAnnotations()) + .withAnnotations(spec.getAnnotations()) .withLabels(labels) .withName(objectMeta.getName()) .withNamespace(objectMeta.getNamespace()) .endMetadata() .withNewSpec() - .withIngressClassName(spec.getIngress().getClassName()) + .withIngressClassName(spec.getClassName()) .withTls(ingressTLS) - .withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig)) + .withRules( + getIngressRuleForV1beta1( + objectMeta, spec, effectiveConfig, serviceName)) .endSpec() .build(); } } + // Todo remove, creatse new svc with every ingress call private static IngressRule getIngressRule( - ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration effectiveConfig) { + ObjectMeta objectMeta, + IngressSpec spec, + Configuration effectiveConfig, + String serviceName) { final String clusterId = objectMeta.getName(); final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = - getIngressUrl( - spec.getIngress().getTemplate(), - objectMeta.getName(), - objectMeta.getNamespace()); + getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder(); ingressRuleBuilder.withHttp( @@ -192,7 +262,7 @@ private static IngressRule getIngressRule( .withPathType("ImplementationSpecific") .withNewBackend() .withNewService() - .withName(clusterId + REST_SVC_NAME_SUFFIX) + .withName(serviceName) .withNewPort() .withNumber(restPort) .endPort() @@ -219,16 +289,14 @@ private static IngressRule getIngressRule( private static io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule getIngressRuleForV1beta1( ObjectMeta objectMeta, - FlinkDeploymentSpec spec, - Configuration effectiveConfig) { + IngressSpec spec, + Configuration effectiveConfig, + String serviceName) { final String clusterId = objectMeta.getName(); final int restPort = effectiveConfig.getInteger(RestOptions.PORT); URL ingressUrl = - getIngressUrl( - spec.getIngress().getTemplate(), - objectMeta.getName(), - objectMeta.getNamespace()); + getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace()); io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder ingressRuleBuilder = new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder(); @@ -236,7 +304,7 @@ private static IngressRule getIngressRule( new io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder() .addNewPath() .withNewBackend() - .withServiceName(clusterId + REST_SVC_NAME_SUFFIX) + .withServiceName(serviceName) .withServicePort(new IntOrString(restPort)) .endBackend() .endPath() diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java index 98344a9ba8..cf9797e66c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java @@ -350,6 +350,17 @@ public static FlinkDeployment prepareFlinkDeployment( flinkDeployment.setSpec(spec.getTemplate().getSpec()); + // Update Ingress template if exists to prevent path collision between Blue and Green + if (flinkDeployment.getSpec().getIngress() != null) { + flinkDeployment + .getSpec() + .getIngress() + .setTemplate( + blueGreenDeploymentType.toString().toLowerCase() + + "-" + + flinkDeployment.getSpec().getIngress().getTemplate()); + } + // Deployment metadata ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(context.getBgDeployment()); flinkDeploymentMeta.setName(childDeploymentName); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java index 229406bbc9..b9dbdd4449 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java @@ -1224,6 +1224,6 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers var flinkDeploymentTemplateSpec = FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build(); - return new FlinkBlueGreenDeploymentSpec(configuration, flinkDeploymentTemplateSpec); + return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec); } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java index 859d19f5d3..c5df792308 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtilsTest.java @@ -102,6 +102,7 @@ private static FlinkBlueGreenDeployment buildBlueGreenDeployment( var bgDeploymentSpec = new FlinkBlueGreenDeploymentSpec( new HashMap<>(), + null, FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build()); deployment.setSpec(bgDeploymentSpec); diff --git a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml index d31d14aa24..0f8ef636fb 100644 --- a/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml @@ -24,6 +24,32 @@ spec: properties: spec: properties: + ingress: + properties: + annotations: + additionalProperties: + type: string + type: object + className: + type: string + labels: + additionalProperties: + type: string + type: object + template: + type: string + tls: + items: + properties: + hosts: + items: + type: string + type: array + secretName: + type: string + type: object + type: array + type: object configuration: additionalProperties: type: "string"