Skip to content

Commit f7f6cbb

Browse files
committed
Added BlueGreen ingress that switches between active Svc + resolve path conflict on Blue and Green deployment ingresses
1 parent eb99c5a commit f7f6cbb

File tree

6 files changed

+172
-21
lines changed

6 files changed

+172
-21
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentSpec.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,7 @@ public class FlinkBlueGreenDeploymentSpec {
3838
@JsonProperty("configuration")
3939
private Map<String, String> configuration;
4040

41+
private IngressSpec ingress;
42+
4143
private FlinkDeploymentTemplateSpec template;
4244
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.kubernetes.operator.controller.bluegreen;
1919

2020
import org.apache.flink.api.common.JobStatus;
21+
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2223
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
2324
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
@@ -29,6 +30,7 @@
2930
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3031
import org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeployments;
3132
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
33+
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
3234
import org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils;
3335
import org.apache.flink.util.Preconditions;
3436

@@ -605,9 +607,61 @@ public UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
605607
context.getDeploymentStatus().setAbortTimestamp(millisToInstantStr(0));
606608
context.getDeploymentStatus().setSavepointTriggerId(null);
607609

610+
reconcileBlueGreenIngress(context, nextState);
608611
return patchStatusUpdateControl(context, nextState, JobStatus.RUNNING, null);
609612
}
610613

614+
/**
615+
* Reconciles the ingress for Blue/Green deployment, pointing to the active deployment.
616+
*
617+
* @param context the Blue/Green context
618+
* @param nextState which deployment (ACTIVE_BLUE or ACTIVE_GREEN) is currently active
619+
*/
620+
public void reconcileBlueGreenIngress(
621+
BlueGreenContext context,
622+
FlinkBlueGreenDeploymentState nextState) {
623+
624+
var bgDeployment = context.getBgDeployment();
625+
var bgSpec = bgDeployment.getSpec();
626+
627+
if (bgSpec.getIngress() == null) {
628+
// No ingress configured, nothing to do
629+
return;
630+
}
631+
632+
FlinkDeployment activeDeployment;
633+
String serviceName;
634+
switch (nextState) {
635+
case ACTIVE_BLUE:
636+
activeDeployment = context.getBlueDeployment();
637+
serviceName = context.getBgDeployment().getMetadata().getName() + "-blue";
638+
break;
639+
case ACTIVE_GREEN:
640+
activeDeployment = context.getGreenDeployment();
641+
serviceName = context.getBgDeployment().getMetadata().getName() + "-green";
642+
break;
643+
default:
644+
LOG.debug("Skipping ingress reconciliation for non-active state: {}", nextState);
645+
return;
646+
}
647+
648+
649+
// Create a FlinkResourceContext for the active deployment to get proper config
650+
FlinkResourceContext<FlinkDeployment> ctx =
651+
context.getCtxFactory()
652+
.getResourceContext(activeDeployment, context.getJosdkContext());
653+
// Get the deployment configuration (includes REST port and all Flink settings)
654+
Configuration deployConfig = ctx.getDeployConfig(activeDeployment.getSpec());
655+
656+
// Call IngressUtils to reconcile the ingress pointing to the active service
657+
IngressUtils.reconcileBlueGreenIngress(
658+
context,
659+
serviceName,
660+
deployConfig,
661+
context.getJosdkContext());
662+
}
663+
664+
611665
// ==================== Common Utility Methods ====================
612666

613667
public static UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/IngressUtils.java

Lines changed: 84 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,16 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils;
1919

20+
import io.javaoperatorsdk.operator.api.reconciler.Context;
2021
import org.apache.flink.configuration.Configuration;
2122
import org.apache.flink.configuration.RestOptions;
23+
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
24+
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
2225
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
26+
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
27+
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2328
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
29+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
2430
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
2531
import org.apache.flink.kubernetes.utils.Constants;
2632
import org.apache.flink.util.Preconditions;
@@ -95,7 +101,7 @@ public static void reconcileIngress(
95101
}
96102
var objectMeta = ctx.getResource().getMetadata();
97103
if (spec.getIngress() != null) {
98-
HasMetadata ingress = getIngress(objectMeta, spec, effectiveConfig, client);
104+
HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client);
99105
setOwnerReference(ctx.getResource(), Collections.singletonList(ingress));
100106
LOG.info("Updating ingress rules {}", ingress);
101107
client.resource(ingress)
@@ -116,33 +122,89 @@ public static void reconcileIngress(
116122
}
117123
}
118124

125+
public static void reconcileBlueGreenIngress(
126+
BlueGreenContext context,
127+
String serviceName,
128+
Configuration effectiveConfig,
129+
Context<FlinkBlueGreenDeployment> client
130+
){
131+
// todo see if I need to find way to cover this using this blueGreen method
132+
// todo see if eventRecorder is strictly required (would be nice to have)
133+
// if (!ctx.getOperatorConfig().isManageIngress()) {
134+
// if (spec.getIngress() != null) {
135+
// eventRecorder.triggerEvent(
136+
// ctx.getResource(),
137+
// EventRecorder.Type.Warning,
138+
// INGRESS_MANAGEMENT,
139+
// INGRESS_MANAGEMENT_OFF_BUT_SPEC_SET,
140+
// EventRecorder.Component.Operator,
141+
// client);
142+
// }
143+
//
144+
// return;
145+
// }
146+
var spec = context.getBgDeployment().getSpec();
147+
var objectMeta = context.getBgDeployment().getMetadata();
148+
if (spec.getIngress() != null) {
149+
HasMetadata ingress = getIngress(objectMeta, spec.getIngress(), effectiveConfig, client.getClient(), serviceName);
150+
setOwnerReference(context.getBgDeployment(), Collections.singletonList(ingress));
151+
LOG.info("BLUE GREEN Updating ingress rules {}", ingress);
152+
client.getClient().resource(ingress)
153+
.inNamespace(objectMeta.getNamespace())
154+
.createOr(NonDeletingOperation::update);
155+
} else {
156+
LOG.info("BLUE GREEN NOTNOTNOT Updating ingress rules ");
157+
Optional<? extends HasMetadata> ingress;
158+
if (ingressInNetworkingV1(client.getClient())) {
159+
ingress =
160+
client
161+
.getSecondaryResource(
162+
io.fabric8.kubernetes.api.model.networking.v1.Ingress
163+
.class);
164+
} else {
165+
ingress = client.getSecondaryResource(Ingress.class);
166+
}
167+
ingress.ifPresent(i ->client.getClient().resource(i).delete());
168+
}
169+
}
170+
119171
private static HasMetadata getIngress(
120172
ObjectMeta objectMeta,
121-
FlinkDeploymentSpec spec,
173+
IngressSpec spec,
122174
Configuration effectiveConfig,
123-
KubernetesClient client) {
175+
KubernetesClient client
176+
) {
177+
return getIngress(objectMeta,spec,effectiveConfig,client, objectMeta.getName() + REST_SVC_NAME_SUFFIX);
178+
}
179+
180+
private static HasMetadata getIngress(
181+
ObjectMeta objectMeta,
182+
IngressSpec spec,
183+
Configuration effectiveConfig,
184+
KubernetesClient client,
185+
String serviceName) {
124186
Map<String, String> labels =
125-
spec.getIngress().getLabels() == null
187+
spec.getLabels() == null
126188
? new HashMap<>()
127-
: new HashMap<>(spec.getIngress().getLabels());
189+
: new HashMap<>(spec.getLabels());
128190
labels.put(Constants.LABEL_COMPONENT_KEY, LABEL_COMPONENT_INGRESS);
129191
if (ingressInNetworkingV1(client)) {
130192
return new IngressBuilder()
131193
.withNewMetadata()
132194
.withLabels(labels)
133-
.withAnnotations(spec.getIngress().getAnnotations())
195+
.withAnnotations(spec.getAnnotations())
134196
.withName(objectMeta.getName())
135197
.withNamespace(objectMeta.getNamespace())
136198
.endMetadata()
137199
.withNewSpec()
138-
.withIngressClassName(spec.getIngress().getClassName())
139-
.withTls(spec.getIngress().getTls())
140-
.withRules(getIngressRule(objectMeta, spec, effectiveConfig))
200+
.withIngressClassName(spec.getClassName())
201+
.withTls(spec.getTls())
202+
.withRules(getIngressRule(objectMeta, spec, effectiveConfig, serviceName))
141203
.endSpec()
142204
.build();
143205
} else {
144206
List<IngressTLS> ingressTLS =
145-
Optional.ofNullable(spec.getIngress().getTls())
207+
Optional.ofNullable(spec.getTls())
146208
.map(
147209
list ->
148210
list.stream()
@@ -160,28 +222,29 @@ private static HasMetadata getIngress(
160222
.orElse(Collections.emptyList());
161223
return new io.fabric8.kubernetes.api.model.networking.v1beta1.IngressBuilder()
162224
.withNewMetadata()
163-
.withAnnotations(spec.getIngress().getAnnotations())
225+
.withAnnotations(spec.getAnnotations())
164226
.withLabels(labels)
165227
.withName(objectMeta.getName())
166228
.withNamespace(objectMeta.getNamespace())
167229
.endMetadata()
168230
.withNewSpec()
169-
.withIngressClassName(spec.getIngress().getClassName())
231+
.withIngressClassName(spec.getClassName())
170232
.withTls(ingressTLS)
171-
.withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig))
233+
.withRules(getIngressRuleForV1beta1(objectMeta, spec, effectiveConfig, serviceName))
172234
.endSpec()
173235
.build();
174236
}
175237
}
176238

239+
// Todo remove, creatse new svc with every ingress call
177240
private static IngressRule getIngressRule(
178-
ObjectMeta objectMeta, FlinkDeploymentSpec spec, Configuration effectiveConfig) {
241+
ObjectMeta objectMeta, IngressSpec spec, Configuration effectiveConfig, String serviceName) {
179242
final String clusterId = objectMeta.getName();
180243
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
181244

182245
URL ingressUrl =
183246
getIngressUrl(
184-
spec.getIngress().getTemplate(),
247+
spec.getTemplate(),
185248
objectMeta.getName(),
186249
objectMeta.getNamespace());
187250

@@ -192,7 +255,7 @@ private static IngressRule getIngressRule(
192255
.withPathType("ImplementationSpecific")
193256
.withNewBackend()
194257
.withNewService()
195-
.withName(clusterId + REST_SVC_NAME_SUFFIX)
258+
.withName(serviceName)
196259
.withNewPort()
197260
.withNumber(restPort)
198261
.endPort()
@@ -219,14 +282,15 @@ private static IngressRule getIngressRule(
219282
private static io.fabric8.kubernetes.api.model.networking.v1beta1.IngressRule
220283
getIngressRuleForV1beta1(
221284
ObjectMeta objectMeta,
222-
FlinkDeploymentSpec spec,
223-
Configuration effectiveConfig) {
285+
IngressSpec spec,
286+
Configuration effectiveConfig,
287+
String serviceName) {
224288
final String clusterId = objectMeta.getName();
225289
final int restPort = effectiveConfig.getInteger(RestOptions.PORT);
226290

227291
URL ingressUrl =
228292
getIngressUrl(
229-
spec.getIngress().getTemplate(),
293+
spec.getTemplate(),
230294
objectMeta.getName(),
231295
objectMeta.getNamespace());
232296

@@ -236,7 +300,7 @@ private static IngressRule getIngressRule(
236300
new io.fabric8.kubernetes.api.model.networking.v1beta1.HTTPIngressRuleValueBuilder()
237301
.addNewPath()
238302
.withNewBackend()
239-
.withServiceName(clusterId + REST_SVC_NAME_SUFFIX)
303+
.withServiceName(serviceName)
240304
.withServicePort(new IntOrString(restPort))
241305
.endBackend()
242306
.endPath()

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,11 @@ public static FlinkDeployment prepareFlinkDeployment(
350350

351351
flinkDeployment.setSpec(spec.getTemplate().getSpec());
352352

353+
// Update Ingress template if exists to prevent path collision between Blue and Green
354+
if (flinkDeployment.getSpec().getIngress() != null) {
355+
flinkDeployment.getSpec().getIngress().setTemplate(blueGreenDeploymentType.toString().toLowerCase() + "-" + flinkDeployment.getSpec().getIngress().getTemplate());
356+
}
357+
353358
// Deployment metadata
354359
ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(context.getBgDeployment());
355360
flinkDeploymentMeta.setName(childDeploymentName);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,6 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
12241224
var flinkDeploymentTemplateSpec =
12251225
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build();
12261226

1227-
return new FlinkBlueGreenDeploymentSpec(configuration, flinkDeploymentTemplateSpec);
1227+
return new FlinkBlueGreenDeploymentSpec(configuration, null,flinkDeploymentTemplateSpec);
12281228
}
12291229
}

helm/flink-kubernetes-operator/crds/flinkbluegreendeployments.flink.apache.org-v1.yml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,32 @@ spec:
2424
properties:
2525
spec:
2626
properties:
27+
ingress:
28+
properties:
29+
annotations:
30+
additionalProperties:
31+
type: string
32+
type: object
33+
className:
34+
type: string
35+
labels:
36+
additionalProperties:
37+
type: string
38+
type: object
39+
template:
40+
type: string
41+
tls:
42+
items:
43+
properties:
44+
hosts:
45+
items:
46+
type: string
47+
type: array
48+
secretName:
49+
type: string
50+
type: object
51+
type: array
52+
type: object
2753
configuration:
2854
additionalProperties:
2955
type: "string"

0 commit comments

Comments
 (0)