Skip to content

Commit 1df7747

Browse files
committed
Simplifying the retry/abort logic.
1 parent 6ecc28c commit 1df7747

File tree

6 files changed

+105
-77
lines changed

6 files changed

+105
-77
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public class FlinkDeploymentTemplateSpec {
4343
@JsonProperty("deploymentDeletionDelayMs")
4444
private int deploymentDeletionDelayMs;
4545

46-
@JsonProperty("maxNumRetries")
47-
private int maxNumRetries;
46+
@JsonProperty("abortGracePeriodMs")
47+
private int abortGracePeriodMs;
4848

4949
@JsonProperty("reconciliationReschedulingIntervalMs")
5050
private int reconciliationReschedulingIntervalMs;

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ public class FlinkBlueGreenDeploymentStatus {
4747
/** Timestamp of last reconciliation. */
4848
private Long lastReconciledTimestamp;
4949

50-
/** Current number of retries. */
51-
private int numRetries;
50+
/** Computed from abortGracePeriodMs, timestamp after which the deployment should be aborted. */
51+
private long abortTimestamp;
5252

5353
/** Timestamp when the deployment became READY/STABLE. Used to determine when to delete it. */
5454
private long deploymentReadyTimestamp;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 79 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.kubernetes.operator.api.bluegreen.DeploymentType;
2525
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2626
import org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentSpec;
27+
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2728
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentState;
2829
import org.apache.flink.kubernetes.operator.api.status.FlinkBlueGreenDeploymentStatus;
2930
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
@@ -61,20 +62,19 @@
6162
import java.util.stream.Collectors;
6263
import java.util.stream.Stream;
6364

64-
import static io.javaoperatorsdk.operator.api.reconciler.UpdateControl.noUpdate;
65-
6665
/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
6766
@ControllerConfiguration
6867
public class FlinkBlueGreenDeploymentController
6968
implements Reconciler<FlinkBlueGreenDeployment>,
7069
EventSourceInitializer<FlinkBlueGreenDeployment> {
7170

7271
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
73-
private static final int DEFAULT_MAX_NUM_RETRIES = 10;
74-
private static final int DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS = 15000;
72+
private static final int DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS = 15000; // 15 secs
7573

7674
private final FlinkResourceContextFactory ctxFactory;
7775

76+
public static int minimumAbortGracePeriodMs = 120000; // 2 mins
77+
7878
public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
7979
this.ctxFactory = ctxFactory;
8080
}
@@ -95,17 +95,16 @@ public Map<String, EventSource> prepareEventSources(
9595

9696
@Override
9797
public UpdateControl<FlinkBlueGreenDeployment> reconcile(
98-
FlinkBlueGreenDeployment flinkBlueGreenDeployment,
99-
Context<FlinkBlueGreenDeployment> josdkContext)
98+
FlinkBlueGreenDeployment bgDeployment, Context<FlinkBlueGreenDeployment> josdkContext)
10099
throws Exception {
101100

102-
FlinkBlueGreenDeploymentStatus deploymentStatus = flinkBlueGreenDeployment.getStatus();
101+
FlinkBlueGreenDeploymentStatus deploymentStatus = bgDeployment.getStatus();
103102

104103
if (deploymentStatus == null) {
105104
deploymentStatus = new FlinkBlueGreenDeploymentStatus();
106-
setLastReconciledSpec(flinkBlueGreenDeployment, deploymentStatus);
105+
setLastReconciledSpec(bgDeployment, deploymentStatus);
107106
return initiateDeployment(
108-
flinkBlueGreenDeployment,
107+
bgDeployment,
109108
deploymentStatus,
110109
DeploymentType.BLUE,
111110
FlinkBlueGreenDeploymentState.TRANSITIONING_TO_BLUE,
@@ -119,38 +118,46 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
119118
switch (deploymentStatus.getBlueGreenState()) {
120119
case ACTIVE_BLUE:
121120
return checkAndInitiateDeployment(
122-
flinkBlueGreenDeployment,
121+
bgDeployment,
123122
deployments,
124123
deploymentStatus,
125124
DeploymentType.BLUE,
126125
josdkContext);
127126
case ACTIVE_GREEN:
128127
return checkAndInitiateDeployment(
129-
flinkBlueGreenDeployment,
128+
bgDeployment,
130129
deployments,
131130
deploymentStatus,
132131
DeploymentType.GREEN,
133132
josdkContext);
134133
case TRANSITIONING_TO_BLUE:
135134
return monitorTransition(
136-
flinkBlueGreenDeployment,
135+
bgDeployment,
137136
deployments,
138137
deploymentStatus,
139138
DeploymentType.GREEN,
140139
josdkContext);
141140
case TRANSITIONING_TO_GREEN:
142141
return monitorTransition(
143-
flinkBlueGreenDeployment,
142+
bgDeployment,
144143
deployments,
145144
deploymentStatus,
146145
DeploymentType.BLUE,
147146
josdkContext);
148147
default:
149-
return noUpdate();
148+
return UpdateControl.noUpdate();
150149
}
151150
}
152151
}
153152

153+
private static void setAbortTimestamp(
154+
FlinkBlueGreenDeployment bgDeployment,
155+
FlinkBlueGreenDeploymentStatus deploymentStatus) {
156+
int abortGracePeriod = bgDeployment.getSpec().getTemplate().getAbortGracePeriodMs();
157+
abortGracePeriod = Math.max(abortGracePeriod, minimumAbortGracePeriodMs);
158+
deploymentStatus.setAbortTimestamp(System.currentTimeMillis() + abortGracePeriod);
159+
}
160+
154161
private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
155162
FlinkBlueGreenDeployment bgDeployment,
156163
FlinkBlueGreenDeployments deployments,
@@ -159,7 +166,7 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
159166
Context<FlinkBlueGreenDeployment> josdkContext)
160167
throws JsonProcessingException {
161168

162-
if (hasSpecChanged(bgDeployment.getSpec(), deploymentStatus, currentDeploymentType)) {
169+
if (hasSpecChanged(bgDeployment.getSpec(), deploymentStatus)) {
163170
// this means the spec was changed during transition,
164171
// ignore the new change, revert the spec and log as warning
165172
bgDeployment.setSpec(
@@ -196,7 +203,7 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
196203
return canDelete(
197204
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
198205
} else {
199-
return retryOrAbort(
206+
return shouldAbort(
200207
bgDeployment, deploymentStatus, josdkContext, nextDeployment, nextState);
201208
}
202209
}
@@ -234,41 +241,56 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
234241
}
235242
}
236243

237-
private UpdateControl<FlinkBlueGreenDeployment> retryOrAbort(
244+
private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
238245
FlinkBlueGreenDeployment bgDeployment,
239246
FlinkBlueGreenDeploymentStatus deploymentStatus,
240247
Context<FlinkBlueGreenDeployment> josdkContext,
241248
FlinkDeployment nextDeployment,
242249
FlinkBlueGreenDeploymentState nextState) {
243-
int maxNumRetries = bgDeployment.getSpec().getTemplate().getMaxNumRetries();
244-
if (maxNumRetries <= 0) {
245-
maxNumRetries = DEFAULT_MAX_NUM_RETRIES;
250+
251+
String deploymentName = nextDeployment.getMetadata().getName();
252+
long abortTimestamp = deploymentStatus.getAbortTimestamp();
253+
254+
if (abortTimestamp == 0) {
255+
throw new IllegalStateException("Unexpected abortTimestamp == 0");
246256
}
247257

248-
if (deploymentStatus.getNumRetries() >= maxNumRetries) {
258+
if (abortTimestamp < System.currentTimeMillis()) {
249259
// ABORT
250260
// Suspend the nextDeployment (FlinkDeployment)
251-
nextDeployment.getStatus().getJobStatus().setState(JobStatus.SUSPENDED);
252-
josdkContext.getClient().resource(nextDeployment).replace();
261+
nextDeployment.getSpec().getJob().setState(JobState.SUSPENDED);
262+
josdkContext.getClient().resource(nextDeployment).update();
253263

254264
// We indicate this Blue/Green deployment is no longer Transitioning
255265
// and rollback the state value
256-
deploymentStatus.setBlueGreenState(
266+
var previousState =
257267
nextState == FlinkBlueGreenDeploymentState.ACTIVE_BLUE
258268
? FlinkBlueGreenDeploymentState.ACTIVE_GREEN
259-
: FlinkBlueGreenDeploymentState.ACTIVE_BLUE);
269+
: FlinkBlueGreenDeploymentState.ACTIVE_BLUE;
270+
271+
deploymentStatus.setBlueGreenState(previousState);
272+
273+
LOG.warn(
274+
"Aborting deployment '"
275+
+ deploymentName
276+
+ "', rolling B/G deployment back to "
277+
+ previousState);
260278

261279
// If the current running FlinkDeployment is not in RUNNING/STABLE,
262280
// we flag this Blue/Green as FAILING
263281
return patchStatusUpdateControl(
264282
bgDeployment, deploymentStatus, null, JobStatus.FAILING);
265283
} else {
266284
// RETRY
267-
deploymentStatus.setNumRetries(deploymentStatus.getNumRetries() + 1);
268-
269-
LOG.info("Deployment " + nextDeployment.getMetadata().getName() + " not ready yet");
285+
var delay = abortTimestamp - System.currentTimeMillis();
286+
LOG.info(
287+
"Deployment '"
288+
+ deploymentName
289+
+ "' not ready yet, retrying in "
290+
+ delay
291+
+ " ms");
270292
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
271-
.rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
293+
.rescheduleAfter(delay);
272294
}
273295
}
274296

@@ -290,34 +312,39 @@ private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
290312

291313
if (currentDeployment != null) {
292314
deleteDeployment(currentDeployment, josdkContext);
293-
return noUpdate();
315+
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(500);
294316
} else {
317+
LOG.info(
318+
"Finalizing deployment '"
319+
+ bgDeployment.getMetadata().getName()
320+
+ "' to "
321+
+ nextState
322+
+ " state");
295323
deploymentStatus.setDeploymentReadyTimestamp(0);
324+
deploymentStatus.setAbortTimestamp(0);
296325
return patchStatusUpdateControl(
297326
bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING);
298327
}
299328
}
300329

301330
private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
302-
FlinkBlueGreenDeployment flinkBlueGreenDeployment,
331+
FlinkBlueGreenDeployment bgDeployment,
303332
FlinkBlueGreenDeployments deployments,
304333
FlinkBlueGreenDeploymentStatus deploymentStatus,
305334
DeploymentType currentDeploymentType,
306335
Context<FlinkBlueGreenDeployment> josdkContext)
307336
throws Exception {
308337

309-
if (hasSpecChanged(
310-
flinkBlueGreenDeployment.getSpec(), deploymentStatus, currentDeploymentType)) {
338+
if (hasSpecChanged(bgDeployment.getSpec(), deploymentStatus)) {
311339

312340
// Ack the change in the spec (setLastReconciledSpec)
313-
setLastReconciledSpec(flinkBlueGreenDeployment, deploymentStatus);
341+
setLastReconciledSpec(bgDeployment, deploymentStatus);
314342

315343
FlinkDeployment currentFlinkDeployment =
316344
DeploymentType.BLUE == currentDeploymentType
317345
? deployments.getFlinkDeploymentBlue()
318346
: deployments.getFlinkDeploymentGreen();
319347

320-
// spec, report the error and abort
321348
if (isDeploymentReady(currentFlinkDeployment, josdkContext, deploymentStatus)) {
322349

323350
DeploymentType nextDeploymentType = DeploymentType.BLUE;
@@ -335,7 +362,7 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
335362
Savepoint lastCheckpoint = configureSavepoint(resourceContext);
336363

337364
return initiateDeployment(
338-
flinkBlueGreenDeployment,
365+
bgDeployment,
339366
deploymentStatus,
340367
nextDeploymentType,
341368
nextState,
@@ -347,19 +374,19 @@ private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
347374
// we flag this Blue/Green as FAILING
348375
if (deploymentStatus.getJobStatus().getState() != JobStatus.FAILING) {
349376
return patchStatusUpdateControl(
350-
flinkBlueGreenDeployment, deploymentStatus, null, JobStatus.FAILING);
377+
bgDeployment, deploymentStatus, null, JobStatus.FAILING);
351378
}
352379
}
353380
}
354381

355-
return noUpdate();
382+
return UpdateControl.noUpdate();
356383
}
357384

358385
private static void setLastReconciledSpec(
359-
FlinkBlueGreenDeployment flinkBlueGreenDeployment,
386+
FlinkBlueGreenDeployment bgDeployment,
360387
FlinkBlueGreenDeploymentStatus deploymentStatus) {
361388
deploymentStatus.setLastReconciledSpec(
362-
SpecUtils.serializeObject(flinkBlueGreenDeployment.getSpec(), "spec"));
389+
SpecUtils.serializeObject(bgDeployment.getSpec(), "spec"));
363390
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
364391
}
365392

@@ -444,7 +471,7 @@ private static Savepoint configureSavepoint(
444471
}
445472

446473
private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
447-
FlinkBlueGreenDeployment flinkBlueGreenDeployment,
474+
FlinkBlueGreenDeployment bgDeployment,
448475
FlinkBlueGreenDeploymentStatus deploymentStatus,
449476
DeploymentType nextDeploymentType,
450477
FlinkBlueGreenDeploymentState nextState,
@@ -453,15 +480,12 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
453480
boolean isFirstDeployment)
454481
throws JsonProcessingException {
455482

456-
deploy(
457-
flinkBlueGreenDeployment,
458-
nextDeploymentType,
459-
lastCheckpoint,
460-
josdkContext,
461-
isFirstDeployment);
483+
deploy(bgDeployment, nextDeploymentType, lastCheckpoint, josdkContext, isFirstDeployment);
462484

463-
return patchStatusUpdateControl(flinkBlueGreenDeployment, deploymentStatus, nextState, null)
464-
.rescheduleAfter(getReconciliationReschedInterval(flinkBlueGreenDeployment));
485+
setAbortTimestamp(bgDeployment, deploymentStatus);
486+
487+
return patchStatusUpdateControl(bgDeployment, deploymentStatus, nextState, null)
488+
.rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
465489
}
466490

467491
private boolean isDeploymentReady(
@@ -508,9 +532,7 @@ private static Stream<PodResource> getDeploymentPods(
508532
}
509533

510534
private boolean hasSpecChanged(
511-
FlinkBlueGreenDeploymentSpec newSpec,
512-
FlinkBlueGreenDeploymentStatus deploymentStatus,
513-
DeploymentType deploymentType) {
535+
FlinkBlueGreenDeploymentSpec newSpec, FlinkBlueGreenDeploymentStatus deploymentStatus) {
514536

515537
String lastReconciledSpec = deploymentStatus.getLastReconciledSpec();
516538
String newSpecSerialized = SpecUtils.serializeObject(newSpec, "spec");
@@ -582,21 +604,23 @@ private void deploy(
582604

583605
private static void deleteDeployment(
584606
FlinkDeployment currentDeployment, Context<FlinkBlueGreenDeployment> josdkContext) {
607+
String deploymentName = currentDeployment.getMetadata().getName();
585608
List<StatusDetails> deletedStatus =
586609
josdkContext
587610
.getClient()
588611
.resources(FlinkDeployment.class)
589612
.inNamespace(currentDeployment.getMetadata().getNamespace())
590-
.withName(currentDeployment.getMetadata().getName())
613+
.withName(deploymentName)
591614
.delete();
592615

593616
boolean deleted =
594617
deletedStatus.size() == 1
595618
&& deletedStatus.get(0).getKind().equals("FlinkDeployment");
619+
596620
if (!deleted) {
597-
LOG.info("Deployment not deleted, will retry");
621+
LOG.info("Deployment '" + deploymentName + "' not deleted, will retry");
598622
} else {
599-
LOG.info("Deployment deleted!");
623+
LOG.info("Deployment '" + deploymentName + "' deleted!");
600624
}
601625
}
602626

0 commit comments

Comments
 (0)