Skip to content

Commit 8812c78

Browse files
committed
[FLINK-38033] Fix accidental upgrade snapshot dispose bug
1 parent 9c7795a commit 8812c78

File tree

2 files changed

+59
-32
lines changed

2 files changed

+59
-32
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -409,25 +409,30 @@ protected void setUpgradeSavepointPath(
409409
conf.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE)
410410
.name());
411411

412-
FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
413-
conf,
414-
ctx.getOperatorConfig(),
415-
ctx.getKubernetesClient(),
416-
ctx.getResource(),
417-
savepointFormatType,
418-
savepointLocation);
412+
var snapshotCrOpt =
413+
FlinkStateSnapshotUtils.createUpgradeSnapshotResource(
414+
conf,
415+
ctx.getOperatorConfig(),
416+
ctx.getKubernetesClient(),
417+
ctx.getResource(),
418+
savepointFormatType,
419+
savepointLocation);
419420
var jobStatus = ctx.getResource().getStatus().getJobStatus();
420421
jobStatus.setUpgradeSavepointPath(savepointLocation);
421422

422-
// Register created savepoint in the now deprecated savepoint info and history
423-
var savepoint =
424-
new Savepoint(
425-
cancelTs.toEpochMilli(),
426-
savepointLocation,
427-
SnapshotTriggerType.UPGRADE,
428-
savepointFormatType,
429-
null);
430-
jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
423+
if (snapshotCrOpt.isEmpty()) {
424+
// Register created savepoint in the now deprecated savepoint info and history
425+
// only if snapshot CR was not created, otherwise it would be double recorded
426+
// and disposed immediately
427+
var savepoint =
428+
new Savepoint(
429+
cancelTs.toEpochMilli(),
430+
savepointLocation,
431+
SnapshotTriggerType.UPGRADE,
432+
savepointFormatType,
433+
null);
434+
jobStatus.getSavepointInfo().updateLastSavepoint(savepoint);
435+
}
431436
}
432437

433438
/**

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@
9090
import org.junit.jupiter.api.Test;
9191
import org.junit.jupiter.api.function.ThrowingConsumer;
9292
import org.junit.jupiter.params.ParameterizedTest;
93+
import org.junit.jupiter.params.provider.Arguments;
9394
import org.junit.jupiter.params.provider.EnumSource;
9495
import org.junit.jupiter.params.provider.MethodSource;
9596
import org.junit.jupiter.params.provider.ValueSource;
@@ -112,6 +113,7 @@
112113
import java.util.function.Consumer;
113114
import java.util.function.Function;
114115
import java.util.function.Predicate;
116+
import java.util.stream.Stream;
115117

116118
import static org.apache.flink.api.common.JobStatus.FINISHED;
117119
import static org.apache.flink.api.common.JobStatus.RECONCILING;
@@ -137,6 +139,7 @@
137139
import static org.junit.jupiter.api.Assertions.assertNull;
138140
import static org.junit.jupiter.api.Assertions.assertTrue;
139141
import static org.junit.jupiter.api.Assertions.fail;
142+
import static org.junit.jupiter.params.provider.Arguments.arguments;
140143

141144
/**
142145
* @link JobStatusObserver unit tests
@@ -235,9 +238,12 @@ public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersio
235238
}
236239

237240
@ParameterizedTest
238-
@MethodSource("org.apache.flink.kubernetes.operator.TestUtils#flinkVersions")
239-
public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
241+
@MethodSource("upgradeArgs")
242+
public void testUpgrade(FlinkVersion flinkVersion, boolean snapshotResource) throws Exception {
240243
FlinkDeployment deployment = TestUtils.buildApplicationCluster(flinkVersion);
244+
conf.set(SNAPSHOT_RESOURCE_ENABLED, snapshotResource);
245+
configManager.updateDefaultConfig(conf);
246+
operatorConfig = configManager.getOperatorConfiguration();
241247

242248
reconciler.reconcile(deployment, context);
243249
var runningJobs = flinkService.listJobs();
@@ -305,26 +311,35 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
305311
assertEquals(0, flinkService.getRunningCount());
306312

307313
var spInfo = statefulUpgrade.getStatus().getJobStatus().getSavepointInfo();
308-
assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
309-
assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
310-
assertEquals(
311-
spInfo.getLastSavepoint(),
312-
new LinkedList<>(spInfo.getSavepointHistory()).getLast());
314+
if (snapshotResource) {
315+
assertNull(spInfo.getLastSavepoint());
316+
} else {
317+
assertEquals("savepoint_0", spInfo.getLastSavepoint().getLocation());
318+
assertEquals(SnapshotTriggerType.UPGRADE, spInfo.getLastSavepoint().getTriggerType());
319+
assertEquals(
320+
spInfo.getLastSavepoint(),
321+
new LinkedList<>(spInfo.getSavepointHistory()).getLast());
322+
}
313323

314324
reconciler.reconcile(statefulUpgrade, context);
315325

316326
runningJobs = flinkService.listJobs();
317327
assertEquals(1, flinkService.getRunningCount());
318328
var snapshots = TestUtils.getFlinkStateSnapshotsForResource(kubernetesClient, deployment);
319-
assertThat(snapshots).isNotEmpty();
320-
assertThat(snapshots.get(0).getSpec().getSavepoint().getPath()).isEqualTo("savepoint_0");
321-
assertEquals(
322-
SnapshotTriggerType.UPGRADE.name(),
323-
snapshots
324-
.get(0)
325-
.getMetadata()
326-
.getLabels()
327-
.get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
329+
if (snapshotResource) {
330+
assertThat(snapshots).isNotEmpty();
331+
assertThat(snapshots.get(0).getSpec().getSavepoint().getPath())
332+
.isEqualTo("savepoint_0");
333+
assertEquals(
334+
SnapshotTriggerType.UPGRADE.name(),
335+
snapshots
336+
.get(0)
337+
.getMetadata()
338+
.getLabels()
339+
.get(CrdConstants.LABEL_SNAPSHOT_TRIGGER_TYPE));
340+
} else {
341+
assertThat(snapshots).isEmpty();
342+
}
328343

329344
// Make sure jobId rotated on savepoint
330345
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
@@ -370,6 +385,13 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
370385
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
371386
}
372387

388+
private static Stream<Arguments> upgradeArgs() {
389+
return Stream.of(
390+
arguments(FlinkVersion.v1_16, true),
391+
arguments(FlinkVersion.v1_20, true),
392+
arguments(FlinkVersion.v1_20, false));
393+
}
394+
373395
private void verifyJobId(
374396
FlinkDeployment deployment, JobStatusMessage status, Configuration conf, JobID jobId) {
375397
// jobId set by operator

0 commit comments

Comments
 (0)