Skip to content

Commit a3ab6d6

Browse files
committed
Updated unit test to assert Savepointing. Checkstyle fixes
1 parent c178eae commit a3ab6d6

File tree

4 files changed

+55
-4
lines changed

4 files changed

+55
-4
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/BlueGreenDeploymentService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
118118
if (specDiff == BlueGreenDiffType.TRANSITION) {
119119
if (handleSavepoint(context, currentFlinkDeployment)) {
120120
// This is the only portion where the last reconciled spec is not set,
121-
// so we can process TRANSITION after the savepoint is done
121+
// so we can reprocess TRANSITION after the savepoint is done
122122
var savepointingState = calculateSavepointingState(currentDeploymentType);
123123
return patchStatusUpdateControl(context, savepointingState, null)
124124
.rescheduleAfter(getReconciliationReschedInterval(context));

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/bluegreen/handlers/SavepointingStateHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
package org.apache.flink.kubernetes.operator.controller.bluegreen.handlers;
22

3-
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
43
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
54
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
65
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
76
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenContext;
87
import org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService;
98

9+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
10+
1011
import static org.apache.flink.kubernetes.operator.controller.bluegreen.BlueGreenDeploymentService.patchStatusUpdateControl;
1112
import static org.apache.flink.kubernetes.operator.utils.bluegreen.BlueGreenUtils.getReconciliationReschedInterval;
1213

14+
/**
15+
* State handler for managing Blue/Green deployment savepointing transitions.
16+
*/
1317
public class SavepointingStateHandler extends AbstractBlueGreenStateHandler {
1418

1519
public SavepointingStateHandler(

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/bluegreen/BlueGreenSpecUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.kubernetes.operator.utils.bluegreen;
1919

20-
import lombok.SneakyThrows;
2120
import org.apache.flink.api.common.JobID;
2221
import org.apache.flink.configuration.CheckpointingOptions;
2322
import org.apache.flink.configuration.ConfigOption;
@@ -40,6 +39,7 @@
4039
import org.apache.flink.util.Preconditions;
4140

4241
import io.fabric8.kubernetes.api.model.ObjectMeta;
42+
import lombok.SneakyThrows;
4343
import org.slf4j.Logger;
4444
import org.slf4j.LoggerFactory;
4545

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,59 @@ public void verifyBasicTransition(
142142
simulateChangeInSpec(
143143
rs.deployment, customValue, ALT_DELETION_DELAY_VALUE, initialSavepointPath);
144144

145-
// Transitioning to the Green deployment
146145
var expectedSavepointPath =
147146
upgradeMode == UpgradeMode.LAST_STATE ? TEST_CHECKPOINT_PATH : initialSavepointPath;
147+
148+
if (upgradeMode == UpgradeMode.SAVEPOINT) {
149+
// In this case there will ALWAYS be a savepoint generated with this value,
150+
// regardless of the initialSavepointPath
151+
expectedSavepointPath = "savepoint_1";
152+
rs = handleSavepoint(rs);
153+
}
154+
155+
// Transitioning to the Green deployment
156+
148157
testTransitionToGreen(rs, customValue, expectedSavepointPath);
149158
}
150159

160+
@NotNull
161+
private TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult handleSavepoint(
162+
TestingFlinkBlueGreenDeploymentController.BlueGreenReconciliationResult rs) throws Exception {
163+
var triggers = flinkService.getSavepointTriggers();
164+
triggers.clear();
165+
166+
rs = reconcile(rs.deployment);
167+
168+
// Simulating a pending savepoint
169+
triggers.put(rs.deployment.getStatus().getSavepointTriggerId(), false);
170+
171+
// Should be in SAVEPOINTING_BLUE state first
172+
assertEquals(
173+
FlinkBlueGreenDeploymentState.SAVEPOINTING_BLUE,
174+
rs.reconciledStatus.getBlueGreenState());
175+
assertTrue(rs.updateControl.isPatchStatus());
176+
assertTrue(rs.updateControl.getScheduleDelay().isPresent());
177+
178+
// This next reconciliation should continue waiting on the pending savepoint
179+
var rs2 = reconcile(rs.deployment);
180+
181+
assertTrue(rs2.updateControl.isNoUpdate());
182+
assertTrue(rs2.updateControl.getScheduleDelay().isPresent());
183+
184+
// Completing the savepoint
185+
triggers.put(rs.deployment.getStatus().getSavepointTriggerId(), true);
186+
187+
// This next reconciliation should move on to the next state
188+
rs = reconcile(rs.deployment);
189+
190+
assertEquals(
191+
FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
192+
rs.reconciledStatus.getBlueGreenState());
193+
assertTrue(rs.updateControl.isPatchStatus());
194+
assertTrue(rs.updateControl.getScheduleDelay().isPresent());
195+
return rs;
196+
}
197+
151198
@ParameterizedTest
152199
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
153200
public void verifyFailureBeforeTransition(FlinkVersion flinkVersion) throws Exception {

0 commit comments

Comments
 (0)