Skip to content

Commit ebab279

Browse files
committed
Fixing configOption default value management and log message formatting
1 parent f6349bf commit ebab279

File tree

4 files changed

+58
-69
lines changed

4 files changed

+58
-69
lines changed

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/FlinkBlueGreenDeploymentConfigOptions.java

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,39 @@
2020
import org.apache.flink.configuration.ConfigOption;
2121
import org.apache.flink.configuration.ConfigOptions;
2222

23+
import java.time.Duration;
24+
2325
/** Configuration options to be used by the Flink Blue/Green Deployments. */
2426
public class FlinkBlueGreenDeploymentConfigOptions {
2527

26-
public static final String BLUE_GREEN_CONF_PREFIX = "bluegreen.";
28+
public static final String K8S_OP_CONF_PREFIX = "kubernetes.operator.";
29+
30+
public static final String BLUE_GREEN_CONF_PREFIX = K8S_OP_CONF_PREFIX + "bluegreen.";
2731

2832
public static final int MIN_ABORT_GRACE_PERIOD_MS = 120000; // 2 mins
2933

3034
public static ConfigOptions.OptionBuilder operatorConfig(String key) {
3135
return ConfigOptions.key(BLUE_GREEN_CONF_PREFIX + key);
3236
}
3337

34-
public static final ConfigOption<Integer> ABORT_GRACE_PERIOD_MS =
35-
operatorConfig("abortGracePeriodMs")
36-
.intType()
37-
.defaultValue(0)
38+
public static final ConfigOption<Duration> ABORT_GRACE_PERIOD =
39+
operatorConfig("abort.grace-period")
40+
.durationType()
41+
.defaultValue(Duration.ofMillis(MIN_ABORT_GRACE_PERIOD_MS))
3842
.withDescription(
39-
"The max time to wait for a deployment to become ready before aborting it, in milliseconds. Cannot be smaller than 2 minutes.");
43+
"The max time to wait in milliseconds for a deployment to become ready before aborting it. Cannot be smaller than 2 minutes.");
4044

41-
public static final ConfigOption<Integer> RECONCILIATION_RESCHEDULING_INTERVAL_MS =
42-
operatorConfig("reconciliationReschedulingIntervalMs")
43-
.intType()
44-
.defaultValue(15000) // 15 seconds
45+
public static final ConfigOption<Duration> RECONCILIATION_RESCHEDULING_INTERVAL =
46+
operatorConfig("reconciliation.reschedule-interval")
47+
.durationType()
48+
.defaultValue(Duration.ofMillis(15000)) // 15 seconds
4549
.withDescription(
4650
"Configurable delay in milliseconds to use when the operator reschedules a reconciliation.");
4751

48-
public static final ConfigOption<Integer> DEPLOYMENT_DELETION_DELAY_MS =
49-
operatorConfig("deploymentDeletionDelayMs")
50-
.intType()
51-
.defaultValue(0)
52+
public static final ConfigOption<Duration> DEPLOYMENT_DELETION_DELAY =
53+
operatorConfig("deployment-deletion.delay")
54+
.durationType()
55+
.defaultValue(Duration.ofMillis(0))
5256
.withDescription(
53-
"Configurable delay before deleting a deployment after being marked done.");
57+
"Configurable delay in milliseconds before deleting a deployment after being marked done.");
5458
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentController.java

Lines changed: 22 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,10 @@
5353
import java.util.Map;
5454
import java.util.Optional;
5555

56-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD_MS;
57-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY_MS;
56+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
57+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY;
5858
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.MIN_ABORT_GRACE_PERIOD_MS;
59-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL_MS;
59+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL;
6060
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.getConfigOption;
6161
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.instantStrToMillis;
6262
import static org.apache.flink.kubernetes.operator.controller.FlinkBlueGreenDeploymentUtils.millisToInstantStr;
@@ -94,20 +94,6 @@ public List<EventSource<?, FlinkBlueGreenDeployment>> prepareEventSources(
9494
return eventSources;
9595
}
9696

97-
// @Override
98-
// public Map<String, EventSource> prepareEventSources(
99-
// EventSourceContext<FlinkBlueGreenDeployment> eventSourceContext) {
100-
// InformerConfiguration<FlinkDeployment> flinkDeploymentInformerConfig =
101-
// InformerConfiguration.from(FlinkDeployment.class, eventSourceContext)
102-
// .withSecondaryToPrimaryMapper(Mappers.fromOwnerReference())
103-
// .withNamespacesInheritedFromController(eventSourceContext)
104-
// .followNamespaceChanges(true)
105-
// .build();
106-
//
107-
// return EventSourceInitializer.nameEventSources(
108-
// new InformerEventSource<>(flinkDeploymentInformerConfig, eventSourceContext));
109-
// }
110-
11197
@Override
11298
public UpdateControl<FlinkBlueGreenDeployment> reconcile(
11399
FlinkBlueGreenDeployment bgDeployment, Context<FlinkBlueGreenDeployment> josdkContext)
@@ -186,7 +172,7 @@ private UpdateControl<FlinkBlueGreenDeployment> checkFirstDeployment(
186172
private static void setAbortTimestamp(
187173
FlinkBlueGreenDeployment bgDeployment,
188174
FlinkBlueGreenDeploymentStatus deploymentStatus) {
189-
Integer abortGracePeriod = getConfigOption(bgDeployment, ABORT_GRACE_PERIOD_MS);
175+
Long abortGracePeriod = getConfigOption(bgDeployment, ABORT_GRACE_PERIOD).toMillis();
190176
abortGracePeriod = Math.max(abortGracePeriod, minimumAbortGracePeriodMs);
191177
deploymentStatus.setAbortTimestamp(
192178
millisToInstantStr(System.currentTimeMillis() + abortGracePeriod));
@@ -252,19 +238,17 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
252238
Context<FlinkBlueGreenDeployment> josdkContext,
253239
FlinkDeployment currentDeployment,
254240
FlinkBlueGreenDeploymentState nextState) {
255-
int deploymentDeletionDelayMs =
256-
Math.max(getConfigOption(bgDeployment, DEPLOYMENT_DELETION_DELAY_MS), 0);
241+
long deploymentDeletionDelayMs =
242+
Math.max(getConfigOption(bgDeployment, DEPLOYMENT_DELETION_DELAY).toMillis(), 0);
257243

258244
long deploymentReadyTimestamp =
259245
instantStrToMillis(deploymentStatus.getDeploymentReadyTimestamp());
260246

261247
if (deploymentReadyTimestamp == 0) {
262248
LOG.info(
263-
"Deployment marked ready on "
264-
+ System.currentTimeMillis()
265-
+ ", rescheduling reconciliation in "
266-
+ deploymentDeletionDelayMs
267-
+ " ms.");
249+
"Deployment marked ready on {}, rescheduling reconciliation in {} ms.",
250+
System.currentTimeMillis(),
251+
deploymentDeletionDelayMs);
268252
deploymentStatus.setDeploymentReadyTimestamp(Instant.now().toString());
269253
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
270254
.rescheduleAfter(deploymentDeletionDelayMs);
@@ -277,7 +261,7 @@ private UpdateControl<FlinkBlueGreenDeployment> canDelete(
277261
bgDeployment, deploymentStatus, josdkContext, currentDeployment, nextState);
278262
} else {
279263
long delay = deletionTs - System.currentTimeMillis();
280-
LOG.info("Rescheduling reconciliation (to delete) in " + delay + " ms.");
264+
LOG.info("Rescheduling reconciliation (to delete) in {} ms.", delay);
281265
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(delay);
282266
}
283267
}
@@ -320,10 +304,9 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
320304
deploymentStatus.setBlueGreenState(previousState);
321305

322306
LOG.warn(
323-
"Aborting deployment '"
324-
+ deploymentName
325-
+ "', rolling B/G deployment back to "
326-
+ previousState);
307+
"Aborting deployment '{}', rolling B/G deployment back to {}",
308+
deploymentName,
309+
previousState);
327310

328311
// If the current running FlinkDeployment is not in RUNNING/STABLE,
329312
// we flag this Blue/Green as FAILING
@@ -332,12 +315,7 @@ private UpdateControl<FlinkBlueGreenDeployment> shouldAbort(
332315
} else {
333316
// RETRY
334317
var delay = abortTimestamp - System.currentTimeMillis();
335-
LOG.info(
336-
"Deployment '"
337-
+ deploymentName
338-
+ "' not ready yet, retrying in "
339-
+ delay
340-
+ " ms");
318+
LOG.info("Deployment '{}' not ready yet, retrying in {} ms", deploymentName, delay);
341319
return patchStatusUpdateControl(bgDeployment, deploymentStatus, null, null)
342320
.rescheduleAfter(delay);
343321
}
@@ -355,11 +333,9 @@ private UpdateControl<FlinkBlueGreenDeployment> deleteAndFinalize(
355333
return UpdateControl.<FlinkBlueGreenDeployment>noUpdate().rescheduleAfter(500);
356334
} else {
357335
LOG.info(
358-
"Finalizing deployment '"
359-
+ bgDeployment.getMetadata().getName()
360-
+ "' to "
361-
+ nextState
362-
+ " state");
336+
"Finalizing deployment '{}' to {} state",
337+
bgDeployment.getMetadata().getName(),
338+
nextState);
363339
deploymentStatus.setDeploymentReadyTimestamp(millisToInstantStr(0));
364340
deploymentStatus.setAbortTimestamp(millisToInstantStr(0));
365341
return patchStatusUpdateControl(
@@ -469,8 +445,11 @@ private UpdateControl<FlinkBlueGreenDeployment> initiateDeployment(
469445

470446
setAbortTimestamp(bgDeployment, deploymentStatus);
471447

472-
var reconciliationReschedInterval =
473-
Math.max(getConfigOption(bgDeployment, RECONCILIATION_RESCHEDULING_INTERVAL_MS), 0);
448+
long reconciliationReschedInterval =
449+
Math.max(
450+
getConfigOption(bgDeployment, RECONCILIATION_RESCHEDULING_INTERVAL)
451+
.toMillis(),
452+
0);
474453

475454
return patchStatusUpdateControl(
476455
bgDeployment, deploymentStatus, nextState, JobStatus.RECONCILING)

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentUtils.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.time.Instant;
2929
import java.util.List;
30+
import java.util.Map;
3031

3132
/** Utility methods for the FlinkBlueGreenDeploymentController. */
3233
public class FlinkBlueGreenDeploymentUtils {
@@ -71,7 +72,12 @@ public static long instantStrToMillis(String instant) {
7172

7273
public static <T> T getConfigOption(
7374
FlinkBlueGreenDeployment bgDeployment, ConfigOption<T> option) {
74-
return Configuration.fromMap(bgDeployment.getSpec().getTemplate().getConfiguration())
75-
.get(option);
75+
Map<String, String> configuration = bgDeployment.getSpec().getTemplate().getConfiguration();
76+
77+
if (configuration == null) {
78+
return option.defaultValue();
79+
}
80+
81+
return Configuration.fromMap(configuration).get(option);
7682
}
7783
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkBlueGreenDeploymentControllerTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@
5959
import java.util.Map;
6060
import java.util.UUID;
6161

62-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD_MS;
63-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY_MS;
64-
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL_MS;
62+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.ABORT_GRACE_PERIOD;
63+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.DEPLOYMENT_DELETION_DELAY;
64+
import static org.apache.flink.kubernetes.operator.api.spec.FlinkBlueGreenDeploymentConfigOptions.RECONCILIATION_RESCHEDULING_INTERVAL;
6565
import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.SAMPLE_JAR;
6666
import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_DEPLOYMENT_NAME;
6767
import static org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils.TEST_NAMESPACE;
@@ -183,9 +183,9 @@ public void verifyFailureDuringTransition(FlinkVersion flinkVersion) throws Exce
183183
var reconciliationReschedulingIntervalMs = 5000;
184184
Map<String, String> configuration =
185185
blueGreenDeployment.getSpec().getTemplate().getConfiguration();
186-
configuration.put(ABORT_GRACE_PERIOD_MS.key(), String.valueOf(abortGracePeriodMs));
186+
configuration.put(ABORT_GRACE_PERIOD.key(), String.valueOf(abortGracePeriodMs));
187187
configuration.put(
188-
RECONCILIATION_RESCHEDULING_INTERVAL_MS.key(),
188+
RECONCILIATION_RESCHEDULING_INTERVAL.key(),
189189
String.valueOf(reconciliationReschedulingIntervalMs));
190190

191191
var rs = executeBasicDeployment(flinkVersion, blueGreenDeployment, false);
@@ -526,7 +526,7 @@ private void simulateChangeInSpec(
526526

527527
if (customDeletionDelayMs > 0) {
528528
template.getConfiguration()
529-
.put(DEPLOYMENT_DELETION_DELAY_MS.key(), String.valueOf(customDeletionDelayMs));
529+
.put(DEPLOYMENT_DELETION_DELAY.key(), String.valueOf(customDeletionDelayMs));
530530
}
531531

532532
FlinkDeploymentSpec spec = template.getSpec();
@@ -648,10 +648,10 @@ private static FlinkBlueGreenDeploymentSpec getTestFlinkDeploymentSpec(FlinkVers
648648
.build();
649649

650650
Map<String, String> configuration = new HashMap<>();
651-
configuration.put(ABORT_GRACE_PERIOD_MS.key(), "1");
652-
configuration.put(RECONCILIATION_RESCHEDULING_INTERVAL_MS.key(), "500");
651+
configuration.put(ABORT_GRACE_PERIOD.key(), "1");
652+
configuration.put(RECONCILIATION_RESCHEDULING_INTERVAL.key(), "500");
653653
configuration.put(
654-
DEPLOYMENT_DELETION_DELAY_MS.key(), String.valueOf(DEFAULT_DELETION_DELAY_VALUE));
654+
DEPLOYMENT_DELETION_DELAY.key(), String.valueOf(DEFAULT_DELETION_DELAY_VALUE));
655655

656656
var flinkDeploymentTemplateSpec =
657657
FlinkDeploymentTemplateSpec.builder()

0 commit comments

Comments
 (0)