Skip to content

Commit 98d041d

Browse files
authored
[FLINK-38548] Rename only the name of child deployment for blue/green deployments
1 parent c479a46 commit 98d041d

File tree

2 files changed

+127
-35
lines changed

2 files changed

+127
-35
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenUtils.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -59,27 +59,6 @@ public class BlueGreenUtils {
5959

6060
// ==================== Spec Operations ====================
6161

62-
/**
63-
* Adjusts name references in a spec by replacing deployment names with child deployment names.
64-
*
65-
* @param spec the spec to adjust
66-
* @param deploymentName the original deployment name
67-
* @param childDeploymentName the child deployment name to replace with
68-
* @param wrapperKey the JSON wrapper key
69-
* @param valueType the spec type
70-
* @return adjusted spec with name references updated
71-
*/
72-
public static <T> T adjustNameReferences(
73-
T spec,
74-
String deploymentName,
75-
String childDeploymentName,
76-
String wrapperKey,
77-
Class<T> valueType) {
78-
String serializedSpec = SpecUtils.writeSpecAsJSON(spec, wrapperKey);
79-
String replacedSerializedSpec = serializedSpec.replace(deploymentName, childDeploymentName);
80-
return SpecUtils.readSpecFromJSON(replacedSerializedSpec, wrapperKey, valueType);
81-
}
82-
8362
/**
8463
* Checks if the Blue/Green deployment spec has changed compared to the last reconciled spec.
8564
*
@@ -341,40 +320,35 @@ public static FlinkDeployment prepareFlinkDeployment(
341320
ObjectMeta bgMeta) {
342321
// Deployment
343322
FlinkDeployment flinkDeployment = new FlinkDeployment();
344-
FlinkBlueGreenDeploymentSpec spec = context.getBgDeployment().getSpec();
323+
FlinkBlueGreenDeploymentSpec originalSpec = context.getBgDeployment().getSpec();
345324

346325
String childDeploymentName =
347326
bgMeta.getName() + "-" + blueGreenDeploymentType.toString().toLowerCase();
348327

349-
FlinkBlueGreenDeploymentSpec adjustedSpec =
350-
adjustNameReferences(
351-
spec,
352-
bgMeta.getName(),
353-
childDeploymentName,
328+
// Create a deep copy of the spec to avoid mutating the original
329+
FlinkBlueGreenDeploymentSpec spec =
330+
SpecUtils.readSpecFromJSON(
331+
SpecUtils.writeSpecAsJSON(originalSpec, "spec"),
354332
"spec",
355333
FlinkBlueGreenDeploymentSpec.class);
356334

357335
// The Blue/Green initialSavepointPath is only used for first-time deployments
358336
if (isFirstDeployment) {
359337
String initialSavepointPath =
360-
adjustedSpec.getTemplate().getSpec().getJob().getInitialSavepointPath();
338+
spec.getTemplate().getSpec().getJob().getInitialSavepointPath();
361339
if (initialSavepointPath != null && !initialSavepointPath.isEmpty()) {
362340
LOG.info("Using initialSavepointPath: " + initialSavepointPath);
363-
adjustedSpec
364-
.getTemplate()
365-
.getSpec()
366-
.getJob()
367-
.setInitialSavepointPath(initialSavepointPath);
341+
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(initialSavepointPath);
368342
} else {
369343
LOG.info("Clean startup with no checkpoint/savepoint restoration");
370344
}
371345
} else if (lastCheckpoint != null) {
372346
String location = lastCheckpoint.getLocation().replace("file:", "");
373347
LOG.info("Using Blue/Green savepoint/checkpoint: " + location);
374-
adjustedSpec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
348+
spec.getTemplate().getSpec().getJob().setInitialSavepointPath(location);
375349
}
376350

377-
flinkDeployment.setSpec(adjustedSpec.getTemplate().getSpec());
351+
flinkDeployment.setSpec(spec.getTemplate().getSpec());
378352

379353
// Deployment metadata
380354
ObjectMeta flinkDeploymentMeta = getDependentObjectMeta(context.getBgDeployment());
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.utils.bluegreen;
19+
20+
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
21+
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
22+
import org.apache.flink.kubernetes.operator.api.bluegreen.BlueGreenDeploymentType;
23+
import org.apache.flink.kubernetes.operator.api.spec.ConfigObjectNode;
24+
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
25+
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
26+
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentTemplateSpec;
27+
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
28+
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
29+
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
30+
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
31+
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
32+
33+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
34+
import org.junit.jupiter.api.Test;
35+
36+
import java.util.HashMap;
37+
import java.util.Map;
38+
import java.util.UUID;
39+
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
42+
/** Tests for {@link BlueGreenUtils}. */
43+
public class BlueGreenUtilsTest {
44+
45+
private static final String TEST_NAMESPACE = "test-namespace";
46+
47+
@Test
48+
public void testPrepareFlinkDeploymentWithoutNameReplacement() {
49+
String parentDeploymentName = "my-app";
50+
FlinkBlueGreenDeployment bgDeployment =
51+
buildBlueGreenDeployment(parentDeploymentName, TEST_NAMESPACE);
52+
53+
// Add configuration that contains the deployment name in values
54+
Map<String, String> flinkConfig =
55+
bgDeployment.getSpec().getTemplate().getSpec().getFlinkConfiguration().asFlatMap();
56+
flinkConfig.put(
57+
"high-availability.storageDir",
58+
"s3://" + parentDeploymentName + "/highavailability");
59+
flinkConfig.put("metrics.scope.jm", parentDeploymentName + ".jm");
60+
bgDeployment.getSpec().getTemplate().getSpec().setFlinkConfiguration(flinkConfig);
61+
62+
BlueGreenContext context = createContext(bgDeployment);
63+
64+
// Test: Prepare a BLUE deployment
65+
FlinkDeployment blueDeployment =
66+
BlueGreenUtils.prepareFlinkDeployment(
67+
context,
68+
BlueGreenDeploymentType.BLUE,
69+
null,
70+
true,
71+
bgDeployment.getMetadata());
72+
73+
// Verify child deployment name is correctly set in metadata
74+
String expectedChildName = parentDeploymentName + "-blue";
75+
assertEquals(expectedChildName, blueDeployment.getMetadata().getName());
76+
77+
// Verify configuration values that contain the parent name are NOT replaced
78+
Map<String, String> resultFlinkConfig =
79+
blueDeployment.getSpec().getFlinkConfiguration().asFlatMap();
80+
assertEquals(
81+
"s3://" + parentDeploymentName + "/highavailability",
82+
resultFlinkConfig.get("high-availability.storageDir"));
83+
assertEquals(parentDeploymentName + ".jm", resultFlinkConfig.get("metrics.scope.jm"));
84+
}
85+
86+
private static FlinkBlueGreenDeployment buildBlueGreenDeployment(
87+
String name, String namespace) {
88+
var deployment = new FlinkBlueGreenDeployment();
89+
deployment.setMetadata(
90+
new ObjectMetaBuilder()
91+
.withName(name)
92+
.withNamespace(namespace)
93+
.withUid(UUID.randomUUID().toString())
94+
.build());
95+
96+
var flinkDeploymentSpec =
97+
FlinkDeploymentSpec.builder()
98+
.flinkConfiguration(new ConfigObjectNode())
99+
.job(JobSpec.builder().upgradeMode(UpgradeMode.STATELESS).build())
100+
.build();
101+
102+
var bgDeploymentSpec =
103+
new FlinkBlueGreenDeploymentSpec(
104+
new HashMap<>(),
105+
FlinkDeploymentTemplateSpec.builder().spec(flinkDeploymentSpec).build());
106+
107+
deployment.setSpec(bgDeploymentSpec);
108+
return deployment;
109+
}
110+
111+
private BlueGreenContext createContext(FlinkBlueGreenDeployment bgDeployment) {
112+
FlinkBlueGreenDeploymentStatus status = new FlinkBlueGreenDeploymentStatus();
113+
status.setLastReconciledSpec(SpecUtils.writeSpecAsJSON(bgDeployment.getSpec(), "spec"));
114+
bgDeployment.setStatus(status);
115+
116+
return new BlueGreenContext(bgDeployment, status, null, null, null);
117+
}
118+
}

0 commit comments

Comments
 (0)