Skip to content

Commit ec1382f

Browse files
committed
Improving/adding E2E tests for blue/green deployments. Checkstyle fixes.
1 parent a3ab6d6 commit ec1382f

File tree

10 files changed

+165
-49
lines changed

10 files changed

+165
-49
lines changed

e2e-tests/data/bluegreen.yaml renamed to e2e-tests/data/bluegreen-laststate.yaml

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,49 @@ spec:
2929
flinkVersion: v1_20
3030
flinkConfiguration:
3131
rest.port: "8081"
32-
execution.checkpointing.interval: "1 min"
32+
execution.checkpointing.interval: "10s"
3333
execution.checkpointing.storage: "filesystem"
3434
state.backend.incremental: "true"
3535
state.checkpoints.dir: "file:///flink-data/checkpoints"
36-
state.checkpoints.num-retained: "50"
36+
state.savepoints.dir: "file:///flink-data/savepoints"
37+
state.checkpoints.num-retained: "5"
3738
taskmanager.numberOfTaskSlots: "1"
3839
serviceAccount: flink
3940
jobManager:
4041
resource:
41-
memory: "2048m"
42+
memory: 1G
4243
cpu: 1
4344
podTemplate:
4445
spec:
4546
containers:
4647
- name: flink-main-container
4748
volumeMounts:
48-
- mountPath: /flink-data
49-
name: flink-volume
49+
- mountPath: /flink-data/checkpoints
50+
name: checkpoint-volume
51+
- mountPath: /flink-data/savepoints
52+
name: savepoint-volume
5053
volumes:
51-
- name: flink-volume
54+
- name: checkpoint-volume
5255
hostPath:
53-
path: /tmp/flink
56+
# directory location on host
57+
path: /tmp/flink/checkpoints
58+
type: Directory
59+
- name: savepoint-volume
60+
hostPath:
61+
# directory location on host
62+
path: /tmp/flink/savepoints
5463
type: Directory
5564
taskManager:
5665
resource:
57-
memory: "2048m"
66+
memory: 2G
5867
cpu: 1
5968
job:
6069
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
6170
parallelism: 1
62-
upgradeMode: stateless
71+
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
72+
args:
73+
- "--error-rate"
74+
- "0.15"
75+
- "--sleep"
76+
- "30"
77+
upgradeMode: last-state
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
apiVersion: flink.apache.org/v1beta1
20+
kind: FlinkBlueGreenDeployment
21+
metadata:
22+
name: basic-bluegreen-example
23+
spec:
24+
template:
25+
spec:
26+
image: flink:1.20
27+
flinkVersion: v1_20
28+
flinkConfiguration:
29+
rest.port: "8081"
30+
taskmanager.numberOfTaskSlots: "1"
31+
serviceAccount: flink
32+
jobManager:
33+
resource:
34+
memory: 1G
35+
cpu: 1
36+
taskManager:
37+
resource:
38+
memory: 2G
39+
cpu: 1
40+
job:
41+
jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
42+
parallelism: 1
43+
entryClass: org.apache.flink.streaming.examples.statemachine.StateMachineExample
44+
args:
45+
- "--error-rate"
46+
- "0.15"
47+
- "--sleep"
48+
- "30"
49+
upgradeMode: stateless

e2e-tests/test_bluegreen.sh renamed to e2e-tests/test_bluegreen_laststate.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ BG_CLUSTER_ID=$CLUSTER_ID
3131
BLUE_CLUSTER_ID="basic-bluegreen-example-blue"
3232
GREEN_CLUSTER_ID="basic-bluegreen-example-green"
3333

34-
APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen.yaml"
34+
APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen-laststate.yaml"
3535
APPLICATION_IDENTIFIER="flinkbgdep/$CLUSTER_ID"
3636
BLUE_APPLICATION_IDENTIFIER="flinkdep/$BLUE_CLUSTER_ID"
3737
GREEN_APPLICATION_IDENTIFIER="flinkdep/$GREEN_CLUSTER_ID"
@@ -42,10 +42,11 @@ TIMEOUT=300
4242
#echo "APPLICATION_IDENTIFIER " $APPLICATION_IDENTIFIER
4343
#echo "BLUE_APPLICATION_IDENTIFIER " $BLUE_APPLICATION_IDENTIFIER
4444

45-
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $BG_CLUSTER_ID
45+
#on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $BG_CLUSTER_ID
4646

4747
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
4848

49+
sleep 1
4950
wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
5051
wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
5152
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env bash
2+
################################################################################
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
################################################################################
19+
20+
# This script tests the Flink Blue/Green Deployments support as follows:
21+
# - Create a FlinkBlueGreenDeployment which automatically starts a "Blue" FlinkDeployment
22+
# - Once this setup is stable, we trigger a transition which will create the "Green" FlinkDeployment
23+
# - Once it's stable, verify the "Blue" FlinkDeployment is torn down
24+
# - Perform additional validation(s) before exiting
25+
26+
SCRIPT_DIR=$(dirname "$(readlink -f "$0")")
27+
source "${SCRIPT_DIR}/utils.sh"
28+
29+
CLUSTER_ID="basic-bluegreen-example"
30+
BG_CLUSTER_ID=$CLUSTER_ID
31+
BLUE_CLUSTER_ID="basic-bluegreen-example-blue"
32+
GREEN_CLUSTER_ID="basic-bluegreen-example-green"
33+
34+
APPLICATION_YAML="${SCRIPT_DIR}/data/bluegreen-stateless.yaml"
35+
APPLICATION_IDENTIFIER="flinkbgdep/$CLUSTER_ID"
36+
BLUE_APPLICATION_IDENTIFIER="flinkdep/$BLUE_CLUSTER_ID"
37+
GREEN_APPLICATION_IDENTIFIER="flinkdep/$GREEN_CLUSTER_ID"
38+
TIMEOUT=300
39+
40+
on_exit cleanup_and_exit "$APPLICATION_YAML" $TIMEOUT $BG_CLUSTER_ID
41+
42+
retry_times 5 30 "kubectl apply -f $APPLICATION_YAML" || exit 1
43+
44+
sleep 1
45+
wait_for_jobmanager_running $BLUE_CLUSTER_ID $TIMEOUT
46+
wait_for_status $BLUE_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
47+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
48+
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_BLUE ${TIMEOUT} || exit 1
49+
50+
blue_job_id=$(kubectl get -oyaml flinkdep/basic-bluegreen-example-blue | yq '.status.jobStatus.jobId')
51+
52+
echo "PATCHING B/G deployment..."
53+
kubectl patch flinkbgdep ${BG_CLUSTER_ID} --type merge --patch '{"spec":{"template":{"spec":{"flinkConfiguration":{"rest.port":"8082","taskmanager.numberOfTaskSlots":"2"}}}}}'
54+
55+
wait_for_status $GREEN_APPLICATION_IDENTIFIER '.status.lifecycleState' STABLE ${TIMEOUT} || exit 1
56+
kubectl wait --for=delete deployment --timeout=${TIMEOUT}s --selector="app=${BLUE_CLUSTER_ID}"
57+
wait_for_status $APPLICATION_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
58+
wait_for_status $APPLICATION_IDENTIFIER '.status.blueGreenState' ACTIVE_GREEN ${TIMEOUT} || exit 1
59+
60+
echo "Successfully run the Flink Blue/Green Deployments test"

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class FlinkBlueGreenDeploymentStatus {
5353
/** Timestamp when the deployment became READY/STABLE. Used to determine when to delete it. */
5454
private String deploymentReadyTimestamp;
5555

56-
/** Persisted triggerId to track transition with savepoint. Only used with UpgradeMode.SAVEPOINT */
56+
/**
57+
* Persisted triggerId to track transition with savepoint. Only used with UpgradeMode.SAVEPOINT
58+
*/
5759
private String savepointTriggerId;
5860
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,10 @@ public boolean monitorSavepoint(
203203
BlueGreenContext context, DeploymentType currentDeploymentType) {
204204

205205
FlinkResourceContext<FlinkDeployment> ctx =
206-
context.getCtxFactory().getResourceContext(
207-
context.getDeploymentByType(currentDeploymentType),
208-
context.getJosdkContext());
206+
context.getCtxFactory()
207+
.getResourceContext(
208+
context.getDeploymentByType(currentDeploymentType),
209+
context.getJosdkContext());
209210

210211
String savepointTriggerId = context.getDeploymentStatus().getSavepointTriggerId();
211212
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
@@ -214,8 +215,7 @@ public boolean monitorSavepoint(
214215
}
215216

216217
private Savepoint configureInitialSavepoint(
217-
BlueGreenContext context,
218-
FlinkDeployment currentFlinkDeployment) {
218+
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) {
219219

220220
FlinkResourceContext<FlinkDeployment> ctx =
221221
context.getCtxFactory()
@@ -227,11 +227,16 @@ private Savepoint configureInitialSavepoint(
227227
var savepointFetchResult = fetchSavepointInfo(ctx, savepointTriggerId);
228228

229229
org.apache.flink.core.execution.SavepointFormatType coreSavepointFormatType =
230-
ctx.getObserveConfig().get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
230+
ctx.getObserveConfig()
231+
.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
231232

232-
var savepointFormatType = SavepointFormatType.valueOf(coreSavepointFormatType.toString());
233+
var savepointFormatType =
234+
SavepointFormatType.valueOf(coreSavepointFormatType.toString());
233235

234-
return Savepoint.of(savepointFetchResult.getLocation(), SnapshotTriggerType.MANUAL, savepointFormatType);
236+
return Savepoint.of(
237+
savepointFetchResult.getLocation(),
238+
SnapshotTriggerType.MANUAL,
239+
savepointFormatType);
235240
}
236241

237242
// Else we start looking for the last checkpoint if needed
@@ -244,8 +249,7 @@ private Savepoint configureInitialSavepoint(
244249
}
245250

246251
private boolean handleSavepoint(
247-
BlueGreenContext context,
248-
FlinkDeployment currentFlinkDeployment) {
252+
BlueGreenContext context, FlinkDeployment currentFlinkDeployment) {
249253

250254
if (!isSavepointRequired(context)) {
251255
return false;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenStateHandlerRegistry.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ ACTIVE_GREEN, new ActiveStateHandler(ACTIVE_GREEN, deploymentService),
5858
SAVEPOINTING_BLUE,
5959
new SavepointingStateHandler(SAVEPOINTING_BLUE, deploymentService),
6060
SAVEPOINTING_GREEN,
61-
new SavepointingStateHandler(SAVEPOINTING_GREEN, deploymentService));
61+
new SavepointingStateHandler(
62+
SAVEPOINTING_GREEN, deploymentService));
6263
}
6364

6465
/**

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
1212
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
1313

14-
/**
15-
* State handler for managing Blue/Green deployment savepointing transitions.
16-
*/
14+
/** State handler for managing Blue/Green deployment savepointing transitions. */
1715
public class SavepointingStateHandler extends AbstractBlueGreenStateHandler {
1816

1917
public SavepointingStateHandler(
@@ -29,11 +27,11 @@ public UpdateControl<FlinkBlueGreenDeployment> handle(BlueGreenContext context)
2927

3028
// Savepoint complete, continue with the transition
3129
if (isSavepointReady) {
32-
var nextState = getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE ?
33-
FlinkBlueGreenDeploymentState.ACTIVE_BLUE :
34-
FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
35-
return patchStatusUpdateControl(context, nextState, null)
36-
.rescheduleAfter(500);
30+
var nextState =
31+
getSupportedState() == FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE
32+
? FlinkBlueGreenDeploymentState.ACTIVE_BLUE
33+
: FlinkBlueGreenDeploymentState.ACTIVE_GREEN;
34+
return patchStatusUpdateControl(context, nextState, null).rescheduleAfter(500);
3735
}
3836

3937
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate()

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenSpecUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,7 @@ public static String triggerSavepoint(FlinkResourceContext<FlinkDeployment> ctx)
225225
}
226226

227227
public static SavepointFetchResult fetchSavepointInfo(
228-
FlinkResourceContext<FlinkDeployment> ctx,
229-
String triggerId) {
228+
FlinkResourceContext<FlinkDeployment> ctx, String triggerId) {
230229
return ctx.getFlinkService()
231230
.fetchSavepointInfo(
232231
triggerId,

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,9 @@
4141
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
4242
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
4343

44-
import io.fabric8.kubernetes.api.model.Event;
45-
import io.fabric8.kubernetes.api.model.EventBuilder;
4644
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
4745
import io.fabric8.kubernetes.client.KubernetesClient;
4846
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
49-
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
5047
import io.javaoperatorsdk.operator.api.reconciler.Context;
5148
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
5249
import org.jetbrains.annotations.NotNull;
@@ -94,18 +91,8 @@ public class FlinkBlueGreenDeploymentControllerTest {
9491
private Context<FlinkBlueGreenDeployment> context;
9592
private TestingFlinkBlueGreenDeploymentController testController;
9693

97-
private KubernetesMockServer mockServer;
9894
private KubernetesClient kubernetesClient;
9995

100-
Event mockedEvent =
101-
new EventBuilder()
102-
.withNewMetadata()
103-
.withName("name")
104-
.endMetadata()
105-
.withType("type")
106-
.withReason("reason")
107-
.build();
108-
10996
@BeforeEach
11097
public void setup() {
11198
flinkService = new TestingFlinkService(kubernetesClient);
@@ -159,7 +146,8 @@ public void verifyBasicTransition(
159146

160147
@NotNull
161148
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult handleSavepoint(
162-
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs) throws Exception {
149+
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs)
150+
throws Exception {
163151
var triggers = flinkService.getSavepointTriggers();
164152
triggers.clear();
165153

@@ -188,8 +176,7 @@ private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult
188176
rs = reconcile(rs.deployment);
189177

190178
assertEquals(
191-
FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
192-
rs.reconciledStatus.getBlueGreenState());
179+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE, rs.reconciledStatus.getBlueGreenState());
193180
assertTrue(rs.updateControl.isPatchStatus());
194181
assertTrue(rs.updateControl.getScheduleDelay().isPresent());
195182
return rs;

0 commit comments

Comments
 (0)