Skip to content

Commit 3d91ba4

Browse files
committed
Optimized/simplified the reconciliation logic for first deployments. Clearer log statements.
1 parent ebab279 commit 3d91ba4

File tree

3 files changed

+84
-62
lines changed

3 files changed

+84
-62
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentConfigOptions.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,38 @@ public class FlinkBlueGreenDeploymentConfigOptions {
2929

3030
public static final String BLUE_GREEN_CONF_PREFIX = K8S_OP_CONF_PREFIX + "bluegreen.";
3131

32-
public static final int MIN_ABORT_GRACE_PERIOD_MS = 120000; // 2 mins
33-
3432
public static ConfigOptions.OptionBuilder operatorConfig(String key) {
3533
return ConfigOptions.key(BLUE_GREEN_CONF_PREFIX + key);
3634
}
3735

36+
/**
37+
* NOTE: The string durations need to be in format "{length value}{time unit label}", e.g.
38+
* "123ms", "321 s". If no time unit label is specified, it will be considered as milliseconds.
39+
* There is no fall back to parse ISO-8601 duration format, until Flink 2.x
40+
*
41+
* <p>Supported time unit labels are:
42+
*
43+
* <ul>
44+
* <li>DAYS: "d", "day"
45+
* <li>HOURS: "h", "hour"
46+
* <li>MINUTES: "m", "min", "minute"
47+
* <li>SECONDS: "s", "sec", "second"
48+
* <li>MILLISECONDS: "ms", "milli", "millisecond"
49+
* <li>MICROSECONDS: "µs", "micro", "microsecond"
50+
* <li>NANOSECONDS: "ns", "nano", "nanosecond"
51+
* </ul>
52+
*/
3853
public static final ConfigOption<Duration> ABORT_GRACE_PERIOD =
3954
operatorConfig("abort.grace-period")
4055
.durationType()
41-
.defaultValue(Duration.ofMillis(MIN_ABORT_GRACE_PERIOD_MS))
56+
.defaultValue(Duration.ofMinutes(2))
4257
.withDescription(
4358
"The max time to wait in milliseconds for a deployment to become ready before aborting it. Cannot be smaller than 2 minutes.");
4459

4560
public static final ConfigOption<Duration> RECONCILIATION_RESCHEDULING_INTERVAL =
4661
operatorConfig("reconciliation.reschedule-interval")
4762
.durationType()
48-
.defaultValue(Duration.ofMillis(15000)) // 15 seconds
63+
.defaultValue(Duration.ofSeconds(15))
4964
.withDescription(
5065
"Configurable delay in milliseconds to use when the operator reschedules a reconciliation.");
5166

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 63 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3333
import org.apache.flink.util.Preconditions;
3434

35-
import com.fasterxml.jackson.core.JsonProcessingException;
3635
import io.fabric8.kubernetes.api.model.ObjectMeta;
3736
import io.fabric8.kubernetes.api.model.StatusDetails;
3837
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
@@ -55,7 +54,6 @@
5554

5655
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
5756
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY;
58-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.MIN_ABORT_GRACE_PERIOD_MS;
5957
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL;
6058
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.getConfigOption;
6159
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.instantStrToMillis;
@@ -69,7 +67,7 @@ public class FlinkBlueGreenDeploymentController implements Reconciler<FlinkBlueG
6967

7068
private final FlinkResourceContextFactory ctxFactory;
7169

72-
public static int minimumAbortGracePeriodMs = MIN_ABORT_GRACE_PERIOD_MS; // 2 mins
70+
public static long minimumAbortGracePeriodMs = ABORT_GRACE_PERIOD.defaultValue().toMillis();
7371

7472
public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
7573
this.ctxFactory = ctxFactory;
@@ -150,8 +148,7 @@ public UpdateControl<FlinkBlueGreenDeployment> reconcile(
150148
private UpdateControl<FlinkBlueGreenDeployment> checkFirstDeployment(
151149
FlinkBlueGreenDeployment bgDeployment,
152150
Context<FlinkBlueGreenDeployment> josdkContext,
153-
FlinkBlueGreenDeploymentStatus deploymentStatus)
154-
throws JsonProcessingException {
151+
FlinkBlueGreenDeploymentStatus deploymentStatus) {
155152
if (deploymentStatus.getLastReconciledSpec() == null
156153
|| hasSpecChanged(bgDeployment.getSpec(), deploymentStatus)) {
157154
// Ack the change in the spec (setLastReconciledSpec)
@@ -172,10 +169,8 @@ private UpdateControl<FlinkBlueGreenDeployment> checkFirstDeployment(
172169
private static void setAbortTimestamp(
173170
FlinkBlueGreenDeployment bgDeployment,
174171
FlinkBlueGreenDeploymentStatus deploymentStatus) {
175-
Long abortGracePeriod = getConfigOption(bgDeployment, ABORT_GRACE_PERIOD).toMillis();
176-
abortGracePeriod = Math.max(abortGracePeriod, minimumAbortGracePeriodMs);
177172
deploymentStatus.setAbortTimestamp(
178-
millisToInstantStr(System.currentTimeMillis() + abortGracePeriod));
173+
millisToInstantStr(System.currentTimeMillis() + getAbortGracePeriod(bgDeployment)));
179174
}
180175

181176
private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
@@ -220,7 +215,12 @@ private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
220215

221216
if (isDeploymentReady(nextDeployment)) {
222217
return canDelete(
223-
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
218+
bgDeployment,
219+
deploymentStatus,
220+
josdkContext,
221+
currentDeployment,
222+
nextDeployment,
223+
nextState);
224224
} else {
225225
return shouldAbort(
226226
bgDeployment,
@@ -237,18 +237,27 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
237237
FlinkBlueGreenDeploymentStatus deploymentStatus,
238238
Context<FlinkBlueGreenDeployment> josdkContext,
239239
FlinkDeployment currentDeployment,
240+
FlinkDeployment nextDeployment,
240241
FlinkBlueGreenDeploymentState nextState) {
241-
long deploymentDeletionDelayMs =
242-
Math.max(getConfigOption(bgDeployment, DEPLOYMENT_DELETION_DELAY).toMillis(), 0);
242+
// currentDeployment will be null in case:
243+
// - of first time deployments
244+
// - the previous deployment has been successfully deleted
245+
// therefore, finalize right away
246+
if (currentDeployment == null) {
247+
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
248+
return finalizeBlueGreenDeployment(bgDeployment, deploymentStatus, nextState);
249+
}
250+
251+
long deploymentDeletionDelayMs = getDeploymentDeletionDelay(bgDeployment);
243252

244253
long deploymentReadyTimestamp =
245254
instantStrToMillis(deploymentStatus.getDeploymentReadyTimestamp());
246255

247256
if (deploymentReadyTimestamp == 0) {
248257
LOG.info(
249-
"Deployment marked ready on {}, rescheduling reconciliation in {} ms.",
250-
System.currentTimeMillis(),
251-
deploymentDeletionDelayMs);
258+
"FlinkDeployment '{}' marked ready, rescheduling reconciliation in {} seconds.",
259+
nextDeployment.getMetadata().getName(),
260+
deploymentDeletionDelayMs / 1000);
252261
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
253262
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
254263
.rescheduleAfter(deploymentDeletionDelayMs);
@@ -257,11 +266,13 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
257266
var deletionTs = deploymentReadyTimestamp + deploymentDeletionDelayMs;
258267

259268
if (deletionTs < System.currentTimeMillis()) {
260-
return deleteAndFinalize(
261-
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
269+
return deleteDeployment(currentDeployment, josdkContext);
262270
} else {
263271
long delay = deletionTs - System.currentTimeMillis();
264-
LOG.info("Rescheduling reconciliation (to delete) in {} ms.", delay);
272+
LOG.info(
273+
"Awaiting deletion delay for FlinkDeployment '{}', rescheduling reconciliation in {} seconds.",
274+
currentDeployment.getMetadata().getName(),
275+
delay / 1000);
265276
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(delay);
266277
}
267278
}
@@ -315,41 +326,36 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
315326
} else {
316327
// RETRY
317328
var delay = abortTimestamp - System.currentTimeMillis();
318-
LOG.info("Deployment '{}' not ready yet, retrying in {} ms", deploymentName, delay);
329+
LOG.info(
330+
"FlinkDeployment '{}' not ready yet, retrying in {} seconds.",
331+
deploymentName,
332+
delay / 1000);
319333
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
320334
.rescheduleAfter(delay);
321335
}
322336
}
323337

324-
private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
338+
private UpdateControl<FlinkBlueGreenDeployment> finalizeBlueGreenDeployment(
325339
FlinkBlueGreenDeployment bgDeployment,
326340
FlinkBlueGreenDeploymentStatus deploymentStatus,
327-
Context<FlinkBlueGreenDeployment> josdkContext,
328-
FlinkDeployment currentDeployment,
329341
FlinkBlueGreenDeploymentState nextState) {
330342

331-
if (currentDeployment != null) {
332-
deleteDeployment(currentDeployment, josdkContext);
333-
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(500);
334-
} else {
335-
LOG.info(
336-
"Finalizing deployment '{}' to {} state",
337-
bgDeployment.getMetadata().getName(),
338-
nextState);
339-
deploymentStatus.setDeploymentReadyTimestamp(millisToInstantStr(0));
340-
deploymentStatus.setAbortTimestamp(millisToInstantStr(0));
341-
return patchStatusUpdateControl(
342-
bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING);
343-
}
343+
LOG.info(
344+
"Finalizing deployment '{}' to {} state",
345+
bgDeployment.getMetadata().getName(),
346+
nextState);
347+
deploymentStatus.setDeploymentReadyTimestamp(millisToInstantStr(0));
348+
deploymentStatus.setAbortTimestamp(millisToInstantStr(0));
349+
return patchStatusUpdateControl(
350+
bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING);
344351
}
345352

346353
private UpdateControl<FlinkBlueGreenDeployment> checkAndInitiateDeployment(
347354
FlinkBlueGreenDeployment bgDeployment,
348355
FlinkBlueGreenDeployments deployments,
349356
FlinkBlueGreenDeploymentStatus deploymentStatus,
350357
DeploymentType currentDeploymentType,
351-
Context<FlinkBlueGreenDeployment> josdkContext)
352-
throws Exception {
358+
Context<FlinkBlueGreenDeployment> josdkContext) {
353359

354360
if (hasSpecChanged(bgDeployment.getSpec(), deploymentStatus)) {
355361

@@ -438,22 +444,15 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
438444
FlinkBlueGreenDeploymentState nextState,
439445
Savepoint lastCheckpoint,
440446
Context<FlinkBlueGreenDeployment> josdkContext,
441-
boolean isFirstDeployment)
442-
throws JsonProcessingException {
447+
boolean isFirstDeployment) {
443448

444449
deploy(bgDeployment, nextDeploymentType, lastCheckpoint, josdkContext, isFirstDeployment);
445450

446451
setAbortTimestamp(bgDeployment, deploymentStatus);
447452

448-
long reconciliationReschedInterval =
449-
Math.max(
450-
getConfigOption(bgDeployment, RECONCILIATION_RESCHEDULING_INTERVAL)
451-
.toMillis(),
452-
0);
453-
454453
return patchStatusUpdateControl(
455454
bgDeployment, deploymentStatus, nextState, JobStatus.RECONCILING)
456-
.rescheduleAfter(reconciliationReschedInterval);
455+
.rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
457456
}
458457

459458
private boolean isDeploymentReady(FlinkDeployment deployment) {
@@ -498,8 +497,6 @@ private void deploy(
498497

499498
// Deployment
500499
FlinkDeployment flinkDeployment = new FlinkDeployment();
501-
flinkDeployment.setApiVersion("flink.apache.org/v1beta1");
502-
flinkDeployment.setKind("FlinkDeployment");
503500
FlinkBlueGreenDeploymentSpec spec = bgDeployment.getSpec();
504501

505502
String childDeploymentName =
@@ -546,7 +543,7 @@ private void deploy(
546543
josdkContext.getClient().resource(flinkDeployment).createOrReplace();
547544
}
548545

549-
private static void deleteDeployment(
546+
private static UpdateControl<FlinkBlueGreenDeployment> deleteDeployment(
550547
FlinkDeployment currentDeployment, Context<FlinkBlueGreenDeployment> josdkContext) {
551548
String deploymentName = currentDeployment.getMetadata().getName();
552549
List<StatusDetails> deletedStatus =
@@ -562,10 +559,26 @@ private static void deleteDeployment(
562559
&& deletedStatus.get(0).getKind().equals("FlinkDeployment");
563560

564561
if (!deleted) {
565-
LOG.info("Deployment '" + deploymentName + "' not deleted, will retry");
562+
LOG.info("FlinkDeployment '{}' not deleted, will retry", deploymentName);
566563
} else {
567-
LOG.info("Deployment '" + deploymentName + "' deleted!");
564+
LOG.info("Deployment '{}' deleted!", deploymentName);
568565
}
566+
567+
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(500);
568+
}
569+
570+
private long getReconciliationReschedInterval(FlinkBlueGreenDeployment bgDeployment) {
571+
return Math.max(
572+
getConfigOption(bgDeployment, RECONCILIATION_RESCHEDULING_INTERVAL).toMillis(), 0);
573+
}
574+
575+
private long getDeploymentDeletionDelay(FlinkBlueGreenDeployment bgDeployment) {
576+
return Math.max(getConfigOption(bgDeployment, DEPLOYMENT_DELETION_DELAY).toMillis(), 0);
577+
}
578+
579+
private static long getAbortGracePeriod(FlinkBlueGreenDeployment bgDeployment) {
580+
long abortGracePeriod = getConfigOption(bgDeployment, ABORT_GRACE_PERIOD).toMillis();
581+
return Math.max(abortGracePeriod, minimumAbortGracePeriodMs);
569582
}
570583

571584
public static void logAndThrow(String message) {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,13 +354,7 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
354354

355355
simulateSubmitAndSuccessfulJobStart(deploymentA);
356356

357-
// 2. Mark the Blue deployment ready
358-
rs = reconcile(rs.deployment);
359-
360-
// 3. Logic for the deployment to get deleted
361-
assertDeploymentDeleted(rs, DEFAULT_DELETION_DELAY_VALUE, bgSpecBefore);
362-
363-
// 4. Finalize the Blue deployment
357+
// 2. Mark the Blue deployment ready and finalize it
364358
var minReconciliationTs = System.currentTimeMillis() - 1;
365359
rs = reconcile(rs.deployment);
366360

@@ -374,7 +368,7 @@ public void verifyFailureBeforeFirstDeployment(FlinkVersion flinkVersion) throws
374368
FlinkBlueGreenDeploymentState.ACTIVE_BLUE,
375369
rs.reconciledStatus.getBlueGreenState());
376370

377-
// 5. Subsequent reconciliation calls = NO-OP
371+
// 3. Subsequent reconciliation calls = NO-OP
378372
var rs2 = reconcile(rs.deployment);
379373
assertTrue(rs2.updateControl.isNoUpdate());
380374
}

0 commit comments

Comments
 (0)