Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| Parameter | Type | Docs |
| ----------| ---- | ---- |
| configuration | java.util.Map<java.lang.String,java.lang.String> | |
| ingress | org.apache.flink.kubernetes.operator.api.spec.IngressSpec | |
| template | org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec | |

### FlinkDeploymentSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.annotation.Nullable;

import java.util.Map;

/** Spec that describes a Flink application with blue/green deployment capabilities. */
Expand All @@ -38,5 +40,7 @@ public class FlinkBlueGreenDeploymentSpec {
@JsonProperty("configuration")
private Map<String, String> configuration;

@Nullable private IngressSpec ingress;

private FlinkDeploymentTemplateSpec template;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.controller.bluegreen;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -605,9 +607,56 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
context.getDeploymentStatus().setSavepointTriggerId(null);

updateBlueGreenIngress(context, nextState);
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null);
}

/**
* Updates the ingress for Blue/Green deployment, pointing to the active deployment.
*
* @param context the Blue/Green context
* @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active
*/
public void updateBlueGreenIngress(
BlueGreenContext context, FlinkBlueGreenDeploymentState nextState) {

var bgDeployment = context.getBgDeployment();
var bgSpec = bgDeployment.getSpec();

if (bgSpec.getIngress() == null) {
// No ingress configured, nothing to do
return;
}

FlinkDeployment activeDeployment;
String serviceName;
switch (nextState) {
case ACTIVE_BLUE:
activeDeployment = context.getBlueDeployment();
break;
case ACTIVE_GREEN:
activeDeployment = context.getGreenDeployment();
break;
default:
LOG.debug("Skipping ingress reconciliation for non-active state: {}", nextState);
return;
}

// Create a FlinkResourceContext for the active deployment to get proper config
FlinkResourceContext<FlinkDeployment> ctx =
context.getCtxFactory()
.getResourceContext(activeDeployment, context.getJosdkContext());
// Get the deployment configuration (includes REST port and all Flink settings)
Configuration deployConfig = ctx.getDeployConfig(activeDeployment.getSpec());

// Call IngressUtils to reconcile the ingress pointing to the active service
IngressUtils.reconcileBlueGreenIngress(
context,
activeDeployment.getMetadata().getName(),
deployConfig,
context.getJosdkContext());
}

// ==================== Common Utility Methods ====================

public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.utils.Constants;
import org.apache.flink.util.Preconditions;
Expand All @@ -38,6 +41,7 @@
import io.fabric8.kubernetes.api.model.networking.v1beta1.IngressTLS;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonDeletingOperation;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -95,7 +99,8 @@ public static void reconcileIngress(
}
var objectMeta = ctx.getResource().getMetadata();
if (spec.getIngress() != null) {
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);
HasMetadata ingress =
getIngress(objectMeta, spec.getIngress(), effectiveConfig, client);
setOwnerReference(ctx.getResource(), Collections.singletonList(ingress));
LOG.info("Updating ingress rules {}", ingress);
client.resource(ingress)
Expand All @@ -116,33 +121,95 @@ public static void reconcileIngress(
}
}

public static void reconcileBlueGreenIngress(
BlueGreenContext context,
String targetDeploymentName,
Configuration effectiveConfig,
Context<FlinkBlueGreenDeployment> client) {
// todo see if I need to find way to cover this using this blueGreen method
// todo see if eventRecorder is strictly required (would be nice to have)
// if (!ctx.getOperatorConfig().isManageIngress()) {
// if (spec.getIngress() != null) {
// eventRecorder.triggerEvent(
// ctx.getResource(),
// EventRecorder.Type.Warning,
// INGRESS_MANAGEMENT,
// INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET,
// EventRecorder.Component.Operator,
// client);
// }
//
// return;
// }
var flinkBlueGreenDeploymentSpec = context.getBgDeployment().getSpec();
var objectMeta = context.getBgDeployment().getMetadata();
if (flinkBlueGreenDeploymentSpec.getIngress() != null) {
HasMetadata ingress =
getIngress(
objectMeta,
flinkBlueGreenDeploymentSpec.getIngress(),
effectiveConfig,
client.getClient(),
targetDeploymentName + REST_SVC_NAME_SUFFIX);
setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress));
LOG.info("BLUE GREEN Updating ingress rules {}", ingress);
client.getClient()
.resource(ingress)
.inNamespace(objectMeta.getNamespace())
.createOr(NonDeletingOperation::update);
} else {
LOG.info("BLUE GREEN NOTNOTNOT Updating ingress rules ");
Optional<? extends HasMetadata> ingress;
if (ingressInNetworkingV1(client.getClient())) {
ingress =
client.getSecondaryResource(
io.fabric8.kubernetes.api.model.networking.v1.Ingress.class);
} else {
ingress = client.getSecondaryResource(Ingress.class);
}
ingress.ifPresent(i -> client.getClient().resource(i).delete());
}
}

private static HasMetadata getIngress(
ObjectMeta objectMeta,
FlinkDeploymentSpec spec,
IngressSpec spec,
Configuration effectiveConfig,
KubernetesClient client) {
return getIngress(
objectMeta,
spec,
effectiveConfig,
client,
objectMeta.getName() + REST_SVC_NAME_SUFFIX);
}

private static HasMetadata getIngress(
ObjectMeta objectMeta,
IngressSpec spec,
Configuration effectiveConfig,
KubernetesClient client,
String serviceName) {
Map<String, String> labels =
spec.getIngress().getLabels() == null
? new HashMap<>()
: new HashMap<>(spec.getIngress().getLabels());
spec.getLabels() == null ? new HashMap<>() : new HashMap<>(spec.getLabels());
labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS);
if (ingressInNetworkingV1(client)) {
return new IngressBuilder()
.withNewMetadata()
.withLabels(labels)
.withAnnotations(spec.getIngress().getAnnotations())
.withAnnotations(spec.getAnnotations())
.withName(objectMeta.getName())
.withNamespace(objectMeta.getNamespace())
.endMetadata()
.withNewSpec()
.withIngressClassName(spec.getIngress().getClassName())
.withTls(spec.getIngress().getTls())
.withRules(getIngressRule(objectMeta, spec, effectiveConfig))
.withIngressClassName(spec.getClassName())
.withTls(spec.getTls())
.withRules(getIngressRule(objectMeta, spec, effectiveConfig, serviceName))
.endSpec()
.build();
} else {
List<IngressTLS> ingressTLS =
Optional.ofNullable(spec.getIngress().getTls())
Optional.ofNullable(spec.getTls())
.map(
list ->
list.stream()
Expand All @@ -160,30 +227,33 @@ private static HasMetadata getIngress(
.orElse(Collections.emptyList());
return new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
.withNewMetadata()
.withAnnotations(spec.getIngress().getAnnotations())
.withAnnotations(spec.getAnnotations())
.withLabels(labels)
.withName(objectMeta.getName())
.withNamespace(objectMeta.getNamespace())
.endMetadata()
.withNewSpec()
.withIngressClassName(spec.getIngress().getClassName())
.withIngressClassName(spec.getClassName())
.withTls(ingressTLS)
.withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig))
.withRules(
getIngressRuleForV1beta1(
objectMeta, spec, effectiveConfig, serviceName))
.endSpec()
.build();
}
}

// Todo remove, creatse new svc with every ingress call
private static IngressRule getIngressRule(
ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration effectiveConfig) {
ObjectMeta objectMeta,
IngressSpec spec,
Configuration effectiveConfig,
String serviceName) {
final String clusterId = objectMeta.getName();
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);

URL ingressUrl =
getIngressUrl(
spec.getIngress().getTemplate(),
objectMeta.getName(),
objectMeta.getNamespace());
getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace());

IngressRuleBuilder ingressRuleBuilder = new IngressRuleBuilder();
ingressRuleBuilder.withHttp(
Expand All @@ -192,7 +262,7 @@ private static IngressRule getIngressRule(
.withPathType("ImplementationSpecific")
.withNewBackend()
.withNewService()
.withName(clusterId + REST_SVC_NAME_SUFFIX)
.withName(serviceName)
.withNewPort()
.withNumber(restPort)
.endPort()
Expand All @@ -219,24 +289,22 @@ private static IngressRule getIngressRule(
private static io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule
getIngressRuleForV1beta1(
ObjectMeta objectMeta,
FlinkDeploymentSpec spec,
Configuration effectiveConfig) {
IngressSpec spec,
Configuration effectiveConfig,
String serviceName) {
final String clusterId = objectMeta.getName();
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);

URL ingressUrl =
getIngressUrl(
spec.getIngress().getTemplate(),
objectMeta.getName(),
objectMeta.getNamespace());
getIngressUrl(spec.getTemplate(), objectMeta.getName(), objectMeta.getNamespace());

io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder ingressRuleBuilder =
new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRuleBuilder();
ingressRuleBuilder.withHttp(
new io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder()
.addNewPath()
.withNewBackend()
.withServiceName(clusterId + REST_SVC_NAME_SUFFIX)
.withServiceName(serviceName)
.withServicePort(new IntOrString(restPort))
.endBackend()
.endPath()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,17 @@ public static FlinkDeployment prepareFlinkDeployment(

flinkDeployment.setSpec(spec.getTemplate().getSpec());

// Update Ingress template if exists to prevent path collision between Blue and Green
if (flinkDeployment.getSpec().getIngress() != null) {
flinkDeployment
.getSpec()
.getIngress()
.setTemplate(
blueGreenDeploymentType.toString().toLowerCase()
+ "-"
+ flinkDeployment.getSpec().getIngress().getTemplate());
}

// Deployment metadata
ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(context.getBgDeployment());
flinkDeploymentMeta.setName(childDeploymentName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,6 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
var flinkDeploymentTemplateSpec =
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build();

return new FlinkBlueGreenDeploymentSpec(configuration, flinkDeploymentTemplateSpec);
return new FlinkBlueGreenDeploymentSpec(configuration, null, flinkDeploymentTemplateSpec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
var bgDeploymentSpec =
new FlinkBlueGreenDeploymentSpec(
new HashMap<>(),
null,
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());

deployment.setSpec(bgDeploymentSpec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,32 @@ spec:
properties:
spec:
properties:
ingress:
properties:
annotations:
additionalProperties:
type: string
type: object
className:
type: string
labels:
additionalProperties:
type: string
type: object
template:
type: string
tls:
items:
properties:
hosts:
items:
type: string
type: array
secretName:
type: string
type: object
type: array
type: object
configuration:
additionalProperties:
type: "string"
Expand Down