Skip to content

Commit d62260b

Browse files
committed
Addressing PR comments
1 parent 3698bb3 commit d62260b

File tree

8 files changed

+166
-74
lines changed

8 files changed

+166
-74
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/bluegreen/DeploymentType.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ public enum DeploymentType {
3030
/** Identifier for the second or "Green" deployment type. */
3131
GREEN;
3232

33+
public static final String LABEL_KEY = "flink/blue-green-deployment-type";
34+
3335
public static DeploymentType fromDeployment(FlinkDeployment flinkDeployment) {
34-
String typeAnnotation =
35-
flinkDeployment.getMetadata().getLabels().get(DeploymentType.class.getSimpleName());
36+
String typeAnnotation = flinkDeployment.getMetadata().getLabels().get(LABEL_KEY);
3637
return DeploymentType.valueOf(typeAnnotation);
3738
}
3839
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.api.spec;
19+
20+
import org.apache.flink.configuration.ConfigOption;
21+
import org.apache.flink.configuration.ConfigOptions;
22+
23+
/** Configuration options to be used by the Flink Blue/Green Deployments. */
24+
public class FlinkBlueGreenDeploymentConfigOptions {
25+
26+
public static final String BLUE_GREEN_CONF_PREFIX = "bluegreen.";
27+
28+
public static final int MIN_ABORT_GRACE_PERIOD_MS = 120000; // 2 mins
29+
30+
public static ConfigOptions.OptionBuilder operatorConfig(String key) {
31+
return ConfigOptions.key(BLUE_GREEN_CONF_PREFIX + key);
32+
}
33+
34+
public static final ConfigOption<Integer> ABORT_GRACE_PERIOD_MS =
35+
operatorConfig("abortGracePeriodMs")
36+
.intType()
37+
.defaultValue(0)
38+
.withDescription(
39+
"The max time to wait for a deployment to become ready before aborting it, in milliseconds. Cannot be smaller than 2 minutes.");
40+
41+
public static final ConfigOption<Integer> RECONCILIATION_RESCHEDULING_INTERVAL_MS =
42+
operatorConfig("reconciliationReschedulingIntervalMs")
43+
.intType()
44+
.defaultValue(15000) // 15 seconds
45+
.withDescription(
46+
"Configurable delay in milliseconds to use when the operator reschedules a reconciliation.");
47+
48+
public static final ConfigOption<Integer> DEPLOYMENT_DELETION_DELAY_MS =
49+
operatorConfig("deploymentDeletionDelayMs")
50+
.intType()
51+
.defaultValue(0)
52+
.withDescription(
53+
"Configurable delay before deleting a deployment after being marked done.");
54+
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkDeploymentTemplateSpec.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,14 +38,8 @@ public class FlinkDeploymentTemplateSpec {
3838
@JsonProperty("metadata")
3939
private ObjectMeta metadata;
4040

41-
@JsonProperty("deploymentDeletionDelayMs")
42-
private int deploymentDeletionDelayMs;
43-
44-
@JsonProperty("abortGracePeriodMs")
45-
private int abortGracePeriodMs;
46-
47-
@JsonProperty("reconciliationReschedulingIntervalMs")
48-
private int reconciliationReschedulingIntervalMs;
41+
@JsonProperty("configuration")
42+
private Map<String, String> configuration;
4943

5044
@JsonProperty("spec")
5145
private FlinkDeploymentSpec spec;

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/FlinkBlueGreenDeploymentStatus.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ public class FlinkBlueGreenDeploymentStatus {
4545
private String lastReconciledSpec;
4646

4747
/** Timestamp of last reconciliation. */
48-
private Long lastReconciledTimestamp;
48+
private String lastReconciledTimestamp;
4949

5050
/** Computed from abortGracePeriodMs, timestamp after which the deployment should be aborted. */
51-
private long abortTimestamp;
51+
private String abortTimestamp;
5252

5353
/** Timestamp when the deployment became READY/STABLE. Used to determine when to delete it. */
54-
private long deploymentReadyTimestamp;
54+
private String deploymentReadyTimestamp;
5555

5656
/** Information about the TaskManagers for the scale subresource. */
5757
private TaskManagerInfo taskManager;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,30 @@
4848
import org.slf4j.Logger;
4949
import org.slf4j.LoggerFactory;
5050

51+
import java.time.Instant;
5152
import java.util.List;
5253
import java.util.Map;
5354
import java.util.Optional;
5455

56+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD_MS;
57+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY_MS;
58+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.MIN_ABORT_GRACE_PERIOD_MS;
59+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL_MS;
60+
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.getConfigOption;
61+
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.instantStrToMillis;
62+
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.millisToInstantStr;
63+
5564
/** Controller that runs the main reconcile loop for Flink Blue/Green deployments. */
5665
@ControllerConfiguration
5766
public class FlinkBlueGreenDeploymentController
5867
implements Reconciler<FlinkBlueGreenDeployment>,
5968
EventSourceInitializer<FlinkBlueGreenDeployment> {
6069

6170
private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
62-
private static final int DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS = 15000; // 15 secs
6371

6472
private final FlinkResourceContextFactory ctxFactory;
6573

66-
public static int minimumAbortGracePeriodMs = 120000; // 2 mins
74+
public static int minimumAbortGracePeriodMs = MIN_ABORT_GRACE_PERIOD_MS; // 2 mins
6775

6876
public FlinkBlueGreenDeploymentController(FlinkResourceContextFactory ctxFactory) {
6977
this.ctxFactory = ctxFactory;
@@ -161,9 +169,10 @@ private UpdateControl<FlinkBlueGreenDeployment> checkFirstDeployment(
161169
private static void setAbortTimestamp(
162170
FlinkBlueGreenDeployment bgDeployment,
163171
FlinkBlueGreenDeploymentStatus deploymentStatus) {
164-
int abortGracePeriod = bgDeployment.getSpec().getTemplate().getAbortGracePeriodMs();
172+
Integer abortGracePeriod = getConfigOption(bgDeployment, ABORT_GRACE_PERIOD_MS);
165173
abortGracePeriod = Math.max(abortGracePeriod, minimumAbortGracePeriodMs);
166-
deploymentStatus.setAbortTimestamp(System.currentTimeMillis() + abortGracePeriod);
174+
deploymentStatus.setAbortTimestamp(
175+
millisToInstantStr(System.currentTimeMillis() + abortGracePeriod));
167176
}
168177

169178
private UpdateControl<FlinkBlueGreenDeployment> monitorTransition(
@@ -227,21 +236,24 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
227236
FlinkDeployment currentDeployment,
228237
FlinkBlueGreenDeploymentState nextState) {
229238
int deploymentDeletionDelayMs =
230-
Math.max(bgDeployment.getSpec().getTemplate().getDeploymentDeletionDelayMs(), 0);
239+
Math.max(getConfigOption(bgDeployment, DEPLOYMENT_DELETION_DELAY_MS), 0);
240+
241+
long deploymentReadyTimestamp =
242+
instantStrToMillis(deploymentStatus.getDeploymentReadyTimestamp());
231243

232-
if (deploymentStatus.getDeploymentReadyTimestamp() == 0) {
244+
if (deploymentReadyTimestamp == 0) {
233245
LOG.info(
234246
"Deployment marked ready on "
235247
+ System.currentTimeMillis()
236248
+ ", rescheduling reconciliation in "
237249
+ deploymentDeletionDelayMs
238250
+ " ms.");
239-
deploymentStatus.setDeploymentReadyTimestamp(System.currentTimeMillis());
251+
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
240252
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
241253
.rescheduleAfter(deploymentDeletionDelayMs);
242254
}
243255

244-
var deletionTs = deploymentStatus.getDeploymentReadyTimestamp() + deploymentDeletionDelayMs;
256+
var deletionTs = deploymentReadyTimestamp + deploymentDeletionDelayMs;
245257

246258
if (deletionTs < System.currentTimeMillis()) {
247259
return deleteAndFinalize(
@@ -262,7 +274,7 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
262274
FlinkBlueGreenDeployments deployments) {
263275

264276
String deploymentName = nextDeployment.getMetadata().getName();
265-
long abortTimestamp = deploymentStatus.getAbortTimestamp();
277+
long abortTimestamp = instantStrToMillis(deploymentStatus.getAbortTimestamp());
266278

267279
if (abortTimestamp == 0) {
268280
throw new IllegalStateException("Unexpected abortTimestamp == 0");
@@ -314,15 +326,6 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
314326
}
315327
}
316328

317-
private static int getReconciliationReschedInterval(FlinkBlueGreenDeployment bgDeployment) {
318-
int reconciliationReschedInterval =
319-
bgDeployment.getSpec().getTemplate().getReconciliationReschedulingIntervalMs();
320-
if (reconciliationReschedInterval <= 0) {
321-
reconciliationReschedInterval = DEFAULT_RECONCILIATION_RESCHEDULING_INTERVAL_MS;
322-
}
323-
return reconciliationReschedInterval;
324-
}
325-
326329
private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
327330
FlinkBlueGreenDeployment bgDeployment,
328331
FlinkBlueGreenDeploymentStatus deploymentStatus,
@@ -340,8 +343,8 @@ private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
340343
+ "' to "
341344
+ nextState
342345
+ " state");
343-
deploymentStatus.setDeploymentReadyTimestamp(0);
344-
deploymentStatus.setAbortTimestamp(0);
346+
deploymentStatus.setDeploymentReadyTimestamp(millisToInstantStr(0));
347+
deploymentStatus.setAbortTimestamp(millisToInstantStr(0));
345348
return patchStatusUpdateControl(
346349
bgDeployment, deploymentStatus, nextState, JobStatus.RUNNING);
347350
}
@@ -407,7 +410,7 @@ private static void setLastReconciledSpec(
407410
FlinkBlueGreenDeploymentStatus deploymentStatus) {
408411
deploymentStatus.setLastReconciledSpec(
409412
SpecUtils.writeSpecAsJSON(bgDeployment.getSpec(), "spec"));
410-
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
413+
deploymentStatus.setLastReconciledTimestamp(Instant.now().toString());
411414
}
412415

413416
private static Savepoint configureSavepoint(
@@ -449,9 +452,12 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
449452

450453
setAbortTimestamp(bgDeployment, deploymentStatus);
451454

455+
var reconciliationReschedInterval =
456+
Math.max(getConfigOption(bgDeployment, RECONCILIATION_RESCHEDULING_INTERVAL_MS), 0);
457+
452458
return patchStatusUpdateControl(
453459
bgDeployment, deploymentStatus, nextState, JobStatus.RECONCILING)
454-
.rescheduleAfter(getReconciliationReschedInterval(bgDeployment));
460+
.rescheduleAfter(reconciliationReschedInterval);
455461
}
456462

457463
private boolean isDeploymentReady(FlinkDeployment deployment) {
@@ -481,7 +487,7 @@ private UpdateControl<FlinkBlueGreenDeployment> patchStatusUpdateControl(
481487
deploymentStatus.getJobStatus().setState(jobState);
482488
}
483489

484-
deploymentStatus.setLastReconciledTimestamp(System.currentTimeMillis());
490+
deploymentStatus.setLastReconciledTimestamp(Instant.now().toString());
485491
flinkBlueGreenDeployment.setStatus(deploymentStatus);
486492
return UpdateControl.patchStatus(flinkBlueGreenDeployment);
487493
}
@@ -491,8 +497,7 @@ private void deploy(
491497
DeploymentType deploymentType,
492498
Savepoint lastCheckpoint,
493499
Context<FlinkBlueGreenDeployment> josdkContext,
494-
boolean isFirstDeployment)
495-
throws JsonProcessingException {
500+
boolean isFirstDeployment) {
496501
ObjectMeta bgMeta = bgDeployment.getMetadata();
497502

498503
// Deployment
@@ -538,8 +543,7 @@ private void deploy(
538543
ObjectMeta flinkDeploymentMeta =
539544
FlinkBlueGreenDeploymentUtils.getDependentObjectMeta(bgDeployment);
540545
flinkDeploymentMeta.setName(childDeploymentName);
541-
flinkDeploymentMeta.setLabels(
542-
Map.of(deploymentType.getClass().getSimpleName(), deploymentType.toString()));
546+
flinkDeploymentMeta.setLabels(Map.of(DeploymentType.LABEL_KEY, deploymentType.toString()));
543547
flinkDeployment.setMetadata(flinkDeploymentMeta);
544548

545549
// Deploy

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentUtils.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.flink.kubernetes.operator.controller;
1919

20+
import org.apache.flink.configuration.ConfigOption;
21+
import org.apache.flink.configuration.Configuration;
2022
import org.apache.flink.kubernetes.operator.api.FlinkBlueGreenDeployment;
2123
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
2224

23-
import com.fasterxml.jackson.core.JsonProcessingException;
2425
import io.fabric8.kubernetes.api.model.ObjectMeta;
2526
import io.fabric8.kubernetes.api.model.OwnerReference;
2627

28+
import java.time.Instant;
2729
import java.util.List;
2830

2931
/** Utility methods for the FlinkBlueGreenDeploymentController. */
@@ -50,10 +52,26 @@ public static <T> T adjustNameReferences(
5052
String deploymentName,
5153
String childDeploymentName,
5254
String wrapperKey,
53-
Class<T> valueType)
54-
throws JsonProcessingException {
55+
Class<T> valueType) {
5556
String serializedSpec = SpecUtils.writeSpecAsJSON(spec, wrapperKey);
5657
String replacedSerializedSpec = serializedSpec.replace(deploymentName, childDeploymentName);
5758
return SpecUtils.readSpecFromJSON(replacedSerializedSpec, wrapperKey, valueType);
5859
}
60+
61+
public static String millisToInstantStr(long millis) {
62+
return Instant.ofEpochMilli(millis).toString();
63+
}
64+
65+
public static long instantStrToMillis(String instant) {
66+
if (instant == null) {
67+
return 0;
68+
}
69+
return Instant.parse(instant).toEpochMilli();
70+
}
71+
72+
public static <T> T getConfigOption(
73+
FlinkBlueGreenDeployment bgDeployment, ConfigOption<T> option) {
74+
return Configuration.fromMap(bgDeployment.getSpec().getTemplate().getConfiguration())
75+
.get(option);
76+
}
5977
}

0 commit comments

Comments
 (0)