Skip to content

Commit a72c5b6

Browse files
committed
[FLINK-35414] Rework last-state upgrade mode to support job cancellation as suspend mechanism
1 parent 6a426b2 commit a72c5b6

File tree

43 files changed

+1205
-851
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1205
-851
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
465465

466466
| Parameter | Type | Docs |
467467
| ----------| ---- | ---- |
468-
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator for manual and periodic snapshots. Only used if FlinkStateSnapshot resources are disabled. |
468+
| lastSavepoint | org.apache.flink.kubernetes.operator.api.status.Savepoint | Last completed savepoint by the operator. |
469469
| triggerId | java.lang.String | Trigger id of a pending savepoint operation. |
470470
| triggerTimestamp | java.lang.Long | Trigger timestamp of a pending savepoint operation. |
471471
| triggerType | org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType | Savepoint trigger mechanism. |

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@
122122
<td>Boolean</td>
123123
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
124124
</tr>
125+
<tr>
126+
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
127+
<td style="word-wrap: break-word;">false</td>
128+
<td>Boolean</td>
129+
<td>Cancel jobs during last-state upgrade.</td>
130+
</tr>
125131
<tr>
126132
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
127133
<td style="word-wrap: break-word;">(none)</td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,12 @@
212212
<td>Boolean</td>
213213
<td>Enables last-state fallback for savepoint upgrade mode. When the job is not running thus savepoint cannot be triggered but HA metadata is available for last state restore the operator can initiate the upgrade process when the flag is enabled.</td>
214214
</tr>
215+
<tr>
216+
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
217+
<td style="word-wrap: break-word;">false</td>
218+
<td>Boolean</td>
219+
<td>Cancel jobs during last-state upgrade.</td>
220+
</tr>
215221
<tr>
216222
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>
217223
<td style="word-wrap: break-word;">(none)</td>

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/docs/CrdReferenceDoclet.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,9 @@ public Void scan(Element e, Integer depth) {
219219
}
220220
break;
221221
case FIELD:
222+
if (e.getModifiers().contains(Modifier.STATIC)) {
223+
return null;
224+
}
222225
out.println(
223226
"| "
224227
+ getNameOrJsonPropValue(e)

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.flink.kubernetes.operator.api.status;
1919

2020
import org.apache.flink.annotation.Experimental;
21-
import org.apache.flink.annotation.Internal;
2221
import org.apache.flink.kubernetes.operator.api.lifecycle.ResourceLifecycleState;
2322
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
2423
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2524

26-
import com.fasterxml.jackson.annotation.JsonIgnore;
2725
import io.fabric8.crd.generator.annotation.PrinterColumn;
2826
import lombok.AllArgsConstructor;
2927
import lombok.Data;
@@ -98,10 +96,4 @@ public ResourceLifecycleState getLifecycleState() {
9896

9997
return ResourceLifecycleState.DEPLOYED;
10098
}
101-
102-
/**
103-
* Internal flag to signal that due to some condition we need to schedule a new reconciliation
104-
* loop immediately. For example autoscaler overrides have changed and we need to apply them.
105-
*/
106-
@JsonIgnore @Internal private boolean immediateReconciliationNeeded = false;
10799
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
2222
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
23-
import org.apache.flink.kubernetes.operator.api.spec.JobState;
2423
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
2524
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
2625

@@ -100,22 +99,4 @@ public boolean isLastReconciledSpecStable() {
10099
public boolean isBeforeFirstDeployment() {
101100
return lastReconciledSpec == null;
102101
}
103-
104-
/**
105-
* This method is only here for backward compatibility reasons. The current version of the
106-
* operator does not leave the resources in UPGRADING state during in-place scaling therefore
107-
* this method will always return false.
108-
*
109-
* @return True if in-place scaling is in progress.
110-
*/
111-
@JsonIgnore
112-
@Deprecated
113-
public boolean scalingInProgress() {
114-
if (isBeforeFirstDeployment() || state != ReconciliationState.UPGRADING) {
115-
return false;
116-
}
117-
var job = deserializeLastReconciledSpec().getJob();
118-
// For regular full upgrades the jobstate is suspended in UPGRADING state
119-
return job != null && job.getState() == JobState.RUNNING;
120-
}
121102
}

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/SavepointInfo.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import lombok.Builder;
2525
import lombok.Data;
2626
import lombok.NoArgsConstructor;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2729

2830
import java.util.ArrayList;
2931
import java.util.List;
@@ -40,6 +42,9 @@ public class SavepointInfo implements SnapshotInfo {
4042
* Last completed savepoint by the operator for manual and periodic snapshots. Only used if
4143
* FlinkStateSnapshot resources are disabled.
4244
*/
45+
private static final Logger LOG = LoggerFactory.getLogger(SavepointInfo.class);
46+
47+
/** Last completed savepoint by the operator. */
4348
private Savepoint lastSavepoint;
4449

4550
/** Trigger id of a pending savepoint operation. */
@@ -82,7 +87,11 @@ public void resetTrigger() {
8287
* @param savepoint Savepoint to be added.
8388
*/
8489
public void updateLastSavepoint(Savepoint savepoint) {
85-
if (lastSavepoint == null || !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
90+
if (savepoint == null) {
91+
lastSavepoint = null;
92+
} else if (lastSavepoint == null
93+
|| !lastSavepoint.getLocation().equals(savepoint.getLocation())) {
94+
LOG.debug("Updating last savepoint to {}", savepoint);
8695
lastSavepoint = savepoint;
8796
savepointHistory.add(savepoint);
8897
if (savepoint.getTriggerType() == SnapshotTriggerType.PERIODIC) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java

Lines changed: 43 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -148,33 +148,41 @@ protected FlinkConfigBuilder applyFlinkConfiguration() {
148148
REST_SERVICE_EXPOSED_TYPE, KubernetesConfigOptions.ServiceExposedType.ClusterIP);
149149
// Set 'web.cancel.enable' to false to avoid users accidentally cancelling jobs.
150150
setDefaultConf(CANCEL_ENABLE, false);
151+
effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
152+
return this;
153+
}
151154

152-
if (spec.getJob() != null) {
153-
// Set 'pipeline.name' to resource name by default for application deployments.
154-
setDefaultConf(PipelineOptions.NAME, clusterId);
155-
156-
// With last-state upgrade mode, set the default value of
157-
// 'execution.checkpointing.interval'
158-
// to 5 minutes when HA is enabled.
159-
if (spec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE) {
160-
setDefaultConf(
161-
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
162-
DEFAULT_CHECKPOINTING_INTERVAL);
163-
}
164-
165-
// We need to keep the application clusters around for proper operator behaviour
166-
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
167-
if (HighAvailabilityMode.isHighAvailabilityModeActivated(effectiveConfig)) {
168-
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
169-
}
155+
protected static void applyJobConfig(String name, Configuration conf, JobSpec jobSpec) {
156+
// Set 'pipeline.name' to resource name by default for application deployments.
157+
setDefaultConf(conf, PipelineOptions.NAME, name);
170158

159+
// With last-state upgrade mode, set the default value of
160+
// 'execution.checkpointing.interval'
161+
// to 5 minutes when HA is enabled.
162+
if (jobSpec.getUpgradeMode() == UpgradeMode.LAST_STATE) {
171163
setDefaultConf(
172-
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
173-
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
164+
conf,
165+
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
166+
DEFAULT_CHECKPOINTING_INTERVAL);
174167
}
168+
setDefaultConf(
169+
conf,
170+
ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
171+
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
175172

176-
effectiveConfig.set(FLINK_VERSION, spec.getFlinkVersion());
177-
return this;
173+
if (jobSpec.getAllowNonRestoredState() != null) {
174+
conf.set(
175+
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
176+
jobSpec.getAllowNonRestoredState());
177+
}
178+
179+
if (jobSpec.getEntryClass() != null) {
180+
conf.set(ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
181+
}
182+
183+
if (jobSpec.getArgs() != null) {
184+
conf.set(ApplicationConfiguration.APPLICATION_ARGS, Arrays.asList(jobSpec.getArgs()));
185+
}
178186
}
179187

180188
protected FlinkConfigBuilder applyLogConfiguration() throws IOException {
@@ -304,29 +312,18 @@ protected FlinkConfigBuilder applyJobOrSessionSpec() throws URISyntaxException {
304312
DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName());
305313

306314
if (jobSpec.getJarURI() != null) {
307-
final URI uri = new URI(jobSpec.getJarURI());
308315
effectiveConfig.set(
309-
PipelineOptions.JARS, Collections.singletonList(uri.toString()));
316+
PipelineOptions.JARS,
317+
Collections.singletonList(new URI(jobSpec.getJarURI()).toString()));
310318
}
311-
312319
effectiveConfig.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism());
313320

314-
if (jobSpec.getAllowNonRestoredState() != null) {
315-
effectiveConfig.set(
316-
SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
317-
jobSpec.getAllowNonRestoredState());
318-
}
319-
320-
if (jobSpec.getEntryClass() != null) {
321-
effectiveConfig.set(
322-
ApplicationConfiguration.APPLICATION_MAIN_CLASS, jobSpec.getEntryClass());
323-
}
321+
// We need to keep the application clusters around for proper operator behaviour
322+
effectiveConfig.set(SHUTDOWN_ON_APPLICATION_FINISH, false);
323+
setDefaultConf(SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR, true);
324324

325-
if (jobSpec.getArgs() != null) {
326-
effectiveConfig.set(
327-
ApplicationConfiguration.APPLICATION_ARGS,
328-
Arrays.asList(jobSpec.getArgs()));
329-
}
325+
// Generic shared job config logic
326+
applyJobConfig(clusterId, effectiveConfig, jobSpec);
330327
} else {
331328
effectiveConfig.set(
332329
DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName());
@@ -423,8 +420,12 @@ public static Configuration buildFrom(
423420
}
424421

425422
private <T> void setDefaultConf(ConfigOption<T> option, T value) {
426-
if (!effectiveConfig.contains(option)) {
427-
effectiveConfig.set(option, value);
423+
setDefaultConf(effectiveConfig, option, value);
424+
}
425+
426+
private static <T> void setDefaultConf(Configuration conf, ConfigOption<T> option, T value) {
427+
if (!conf.contains(option)) {
428+
conf.set(option, value);
428429
}
429430
}
430431

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.function.Consumer;
6060

6161
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
62+
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.applyJobConfig;
6263
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX;
6364
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.NAMESPACE_CONF_PREFIX;
6465
import static org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL;
@@ -292,14 +293,15 @@ private void applyConfigsFromCurrentSpec(
292293
* @return Session job config
293294
*/
294295
public Configuration getSessionJobConfig(
295-
FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
296+
String name, FlinkDeployment deployment, FlinkSessionJobSpec sessionJobSpec) {
296297
Configuration sessionJobConfig = getObserveConfig(deployment);
297298

298299
// merge session job specific config
299300
var sessionJobFlinkConfiguration = sessionJobSpec.getFlinkConfiguration();
300301
if (sessionJobFlinkConfiguration != null) {
301302
sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);
302303
}
304+
applyJobConfig(name, sessionJobConfig, sessionJobSpec.getJob());
303305
return sessionJobConfig;
304306
}
305307

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,13 @@ public static String operatorConfigKey(String key) {
465465
.withDescription(
466466
"Max allowed checkpoint age for initiating last-state upgrades on running jobs. If a checkpoint is not available within the desired age (and nothing in progress) a savepoint will be triggered.");
467467

468+
@Documentation.Section(SECTION_DYNAMIC)
469+
public static final ConfigOption<Boolean> OPERATOR_JOB_UPGRADE_LAST_STATE_CANCEL_JOB =
470+
operatorConfig("job.upgrade.last-state.job-cancel.enabled")
471+
.booleanType()
472+
.defaultValue(false)
473+
.withDescription("Cancel jobs during last-state upgrade.");
474+
468475
@Documentation.Section(SECTION_ADVANCED)
469476
public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =
470477
operatorConfig("health.probe.enabled")

0 commit comments

Comments
 (0)