Skip to content

Commit ff8bbbc

Browse files
committed
Review comments
1 parent 324fc9b commit ff8bbbc

File tree

6 files changed

+6
-23
lines changed

6 files changed

+6
-23
lines changed

docs/layouts/shortcodes/generated/dynamic_section.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
127127
<td style="word-wrap: break-word;">false</td>
128128
<td>Boolean</td>
129-
<td>Cancel jobs during last-state upgrade.</td>
129+
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
130130
</tr>
131131
<tr>
132132
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>

docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@
216216
<td><h5>kubernetes.operator.job.upgrade.last-state.job-cancel.enabled</h5></td>
217217
<td style="word-wrap: break-word;">false</td>
218218
<td>Boolean</td>
219-
<td>Cancel jobs during last-state upgrade.</td>
219+
<td>Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.</td>
220220
</tr>
221221
<tr>
222222
<td><h5>kubernetes.operator.job.upgrade.last-state.max.allowed.checkpoint.age</h5></td>

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,8 @@ public static String operatorConfigKey(String key) {
470470
operatorConfig("job.upgrade.last-state.job-cancel.enabled")
471471
.booleanType()
472472
.defaultValue(false)
473-
.withDescription("Cancel jobs during last-state upgrade.");
473+
.withDescription(
474+
"Cancel jobs during last-state upgrade. This config is ignored for session jobs where cancel is the only mechanism to perform this type of upgrade.");
474475

475476
@Documentation.Section(SECTION_ADVANCED)
476477
public static final ConfigOption<Boolean> OPERATOR_HEALTH_PROBE_ENABLED =

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ protected void onTargetJobNotFound(FlinkResourceContext<R> ctx) {
122122
// upgrading state and retry the upgrade (if possible)
123123
resource.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
124124
}
125-
jobStatus.setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
125+
jobStatus.setState(JobStatus.RECONCILING.name());
126126
resource.getStatus().setError(JOB_NOT_FOUND_ERR);
127127
}
128128

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
2727
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
2828
import org.apache.flink.kubernetes.operator.api.spec.JobState;
29-
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
3029
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
3130
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
3231
import org.apache.flink.kubernetes.operator.api.status.FlinkStateSnapshotStatus;
@@ -43,7 +42,6 @@
4342
import org.apache.flink.kubernetes.operator.exception.ValidationException;
4443
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
4544
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
46-
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
4745

4846
import io.fabric8.kubernetes.client.CustomResource;
4947
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
@@ -296,6 +294,7 @@ UpdateControl<R> toUpdateControl(
296294
}
297295
}
298296

297+
@VisibleForTesting
299298
public static Duration rescheduleAfter(
300299
JobManagerDeploymentStatus status,
301300
FlinkDeployment flinkDeployment,
@@ -308,7 +307,6 @@ public static Duration rescheduleAfter(
308307
case READY:
309308
rescheduleAfter =
310309
savepointInProgress(flinkDeployment.getStatus().getJobStatus())
311-
|| isJobCancelling(flinkDeployment.getStatus())
312310
? operatorConfiguration.getProgressCheckInterval()
313311
: operatorConfiguration.getReconcileInterval();
314312
break;
@@ -329,18 +327,6 @@ private static boolean savepointInProgress(JobStatus jobStatus) {
329327
return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
330328
}
331329

332-
public static boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously(
333-
AbstractFlinkResource<?, ?> flinkApp, Configuration observeConfig) {
334-
335-
var deployedSpec = getDeployedSpec(flinkApp);
336-
UpgradeMode previousUpgradeMode = deployedSpec.getJob().getUpgradeMode();
337-
UpgradeMode currentUpgradeMode = flinkApp.getSpec().getJob().getUpgradeMode();
338-
339-
return previousUpgradeMode != UpgradeMode.LAST_STATE
340-
&& currentUpgradeMode == UpgradeMode.LAST_STATE
341-
&& !HighAvailabilityMode.isHighAvailabilityModeActivated(observeConfig);
342-
}
343-
344330
public static <SPEC extends AbstractFlinkSpec> SPEC getDeployedSpec(
345331
AbstractFlinkResource<SPEC, ?> deployment) {
346332
var reconciliationStatus = deployment.getStatus().getReconciliationStatus();

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import io.fabric8.kubernetes.client.KubernetesClient;
4040
import lombok.AllArgsConstructor;
4141
import lombok.Getter;
42-
import org.slf4j.Logger;
43-
import org.slf4j.LoggerFactory;
4442

4543
import javax.annotation.Nullable;
4644

@@ -51,8 +49,6 @@
5149
/** Service for submitting and interacting with Flink clusters and jobs. */
5250
public interface FlinkService {
5351

54-
Logger LOG = LoggerFactory.getLogger(FlinkService.class);
55-
5652
KubernetesClient getKubernetesClient();
5753

5854
void submitApplicationCluster(JobSpec jobSpec, Configuration conf, boolean requireHaMetadata)

0 commit comments

Comments
 (0)