Skip to content

Commit 2b82a93

Browse files
committed
[FLINK-35831] Rotate jobId for both savepoint and stateless deploys
1 parent ffaa3dd commit 2b82a93

File tree

2 files changed

+28
-9
lines changed

2 files changed

+28
-9
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ public void deploy(
175175
statusRecorder.patchAndCacheStatus(relatedResource, ctx.getKubernetesClient());
176176
}
177177

178-
setJobIdIfNecessary(spec, relatedResource, deployConfig, ctx.getKubernetesClient());
178+
setJobIdIfNecessary(
179+
relatedResource, deployConfig, ctx.getKubernetesClient(), requireHaMetadata);
179180

180181
eventRecorder.triggerEvent(
181182
relatedResource,
@@ -193,10 +194,10 @@ public void deploy(
193194
}
194195

195196
private void setJobIdIfNecessary(
196-
FlinkDeploymentSpec spec,
197197
FlinkDeployment resource,
198198
Configuration deployConfig,
199-
KubernetesClient client) {
199+
KubernetesClient client,
200+
boolean lastStateDeploy) {
200201
// The jobId assigned by Flink would be constant,
201202
// overwrite to avoid checkpoint path conflicts.
202203
// https://issues.apache.org/jira/browse/FLINK-19358
@@ -208,9 +209,8 @@ private void setJobIdIfNecessary(
208209
}
209210

210211
var status = resource.getStatus();
211-
// generate jobId initially or rotate on every deployment when mode is stateless
212-
if (status.getJobStatus().getJobId() == null
213-
|| spec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
212+
// Rotate job id when not last-state deployment
213+
if (status.getJobStatus().getJobId() == null || !lastStateDeploy) {
214214
String jobId = JobID.generate().toHexString();
215215
// record before first deployment to ensure we use it on any retry
216216
status.getJobStatus().setJobId(jobId);

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,15 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
230230
.isFirstDeployment());
231231

232232
JobID jobId = runningJobs.get(0).f1.getJobId();
233-
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
233+
234+
// Last state upgrade
235+
FlinkDeployment lastStateUpgrade = ReconciliationUtils.clone(deployment);
236+
getJobSpec(lastStateUpgrade).setUpgradeMode(UpgradeMode.LAST_STATE);
237+
lastStateUpgrade.getSpec().setRestartNonce(1234L);
238+
reconciler.reconcile(deployment, context);
239+
reconciler.reconcile(deployment, context);
240+
// Make sure jobId is rotated on last-state startup
241+
verifyJobId(lastStateUpgrade, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
234242

235243
// Test stateless upgrade
236244
FlinkDeployment statelessUpgrade = ReconciliationUtils.clone(deployment);
@@ -284,7 +292,10 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
284292
SnapshotTriggerType.UPGRADE,
285293
getSavepointInfo(statefulUpgrade).getLastSavepoint().getTriggerType());
286294
assertEquals(SnapshotStatus.SUCCEEDED, getLastSnapshotStatus(statefulUpgrade, SAVEPOINT));
287-
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
295+
296+
// Make sure jobId rotated on savepoint
297+
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
298+
jobId = runningJobs.get(0).f1.getJobId();
288299

289300
getJobSpec(deployment).setUpgradeMode(UpgradeMode.LAST_STATE);
290301
deployment.getSpec().setRestartNonce(100L);
@@ -325,7 +336,8 @@ public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
325336

326337
assertEquals(1, flinkService.getRunningCount());
327338
assertEquals("finished_sp", runningJobs.get(0).f0);
328-
verifyJobId(deployment, runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
339+
// Make sure jobId rotated on savepoint
340+
verifyNewJobId(runningJobs.get(0).f1, runningJobs.get(0).f2, jobId);
329341
}
330342

331343
private void verifyJobId(
@@ -335,6 +347,13 @@ private void verifyJobId(
335347
assertEquals(conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId.toHexString());
336348
}
337349

350+
private void verifyNewJobId(JobStatusMessage status, Configuration conf, JobID jobId) {
351+
assertNotEquals(jobId.toHexString(), status.getJobId());
352+
assertEquals(
353+
conf.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID),
354+
status.getJobId().toHexString());
355+
}
356+
338357
@NotNull
339358
private static Savepoint savepointFromSavepointInfo(
340359
SavepointInfo savepointInfo, Long savepointTriggerNonce) {

0 commit comments

Comments
 (0)