diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md index 34051837cb..41bd17efc5 100644 --- a/docs/content/docs/custom-resource/reference.md +++ b/docs/content/docs/custom-resource/reference.md @@ -182,6 +182,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r | upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. | | allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. | | savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. | +| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. This can be used to quickly go back to the initial user-provided parallelism settings without having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply change the nonce to a new non-null value. | ### JobState **Class**: org.apache.flink.kubernetes.operator.api.spec.JobState diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java index a6c582cc23..7a08260d14 100644 --- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java +++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java @@ -97,4 +97,13 @@ public class JobSpec implements Diffable { */ @SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true) private Long savepointRedeployNonce; + + /** + * Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. + * This can be used to quickly go back to the initial user-provided parallelism settings without + * having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply + * change the nonce to a new non-null value. + */ + @SpecDiff(value = DiffType.IGNORE) + private Long autoscalerResetNonce; } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java index 37e2bde90a..2936501a88 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java @@ -211,6 +211,19 @@ public static void updateLastReconciledSnapshot reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis()); } + public static void updateLastReconciledAutoscalerResetNonce( + AbstractFlinkResource target) { + var spec = target.getSpec(); + var reconciliationStatus = target.getStatus().getReconciliationStatus(); + var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec(); + + lastReconciledSpec + .getJob() + .setAutoscalerResetNonce(spec.getJob().getAutoscalerResetNonce()); + reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target); + reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis()); + } + private static void updateLastReconciledJobSpec( JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) { switch (snapshotType) { diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 3b2048c9f7..7f8af3ca88 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -185,11 +185,27 @@ private Optional getInitialSnapshotPath(AbstractFlinkSpec spec) { private void applyAutoscaler(FlinkResourceContext ctx) throws Exception { var autoScalerCtx = ctx.getJobAutoScalerContext(); + var resource = ctx.getResource(); boolean autoscalerEnabled = - ctx.getResource().getSpec().getJob() != null + resource.getSpec().getJob() != null && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED); autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled); + var reconStatus = resource.getStatus().getReconciliationStatus(); + if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) { + var newResetNonce = resource.getSpec().getJob().getAutoscalerResetNonce(); + // check if the nonce changed to a non-null value + if (newResetNonce != null + && !newResetNonce.equals( + reconStatus + .deserializeLastReconciledSpec() + .getJob() + .getAutoscalerResetNonce())) { + autoscaler.cleanup(autoScalerCtx); + ReconciliationUtils.updateLastReconciledAutoscalerResetNonce(resource); + } + } + autoscaler.scale(autoScalerCtx); } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java index 34a71d3bcc..47ce5e4eb2 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.autoscaler.JobAutoScaler; import org.apache.flink.autoscaler.NoopJobAutoscaler; +import org.apache.flink.autoscaler.config.AutoScalerOptions; import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; @@ -57,6 +58,7 @@ import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo; import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType; import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils; +import org.apache.flink.kubernetes.operator.api.utils.SpecUtils; import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext; import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration; import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions; @@ -947,29 +949,43 @@ public void testApplyAutoscalerParallelism() throws Exception { public void scale(KubernetesJobAutoScalerContext ctx) { overrideFunction.get().accept(ctx.getResource().getSpec()); } + + @Override + public void cleanup(KubernetesJobAutoScalerContext ctx) { + overrideFunction.set(s -> {}); + } }; + var v1 = new JobVertexID(); appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler); var deployment = TestUtils.buildApplicationCluster(); + var config = deployment.getSpec().getFlinkConfiguration(); + config.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true"); + config.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1"); + + var specCopy = SpecUtils.clone(deployment.getSpec()); + appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); + deployment.setSpec(SpecUtils.clone(specCopy)); // Job running verify no upgrades if overrides are empty appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + deployment.setSpec(SpecUtils.clone(specCopy)); assertEquals( ReconciliationState.DEPLOYED, deployment.getStatus().getReconciliationStatus().getState()); assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState()); // Test overrides are applied correctly - var v1 = new JobVertexID(); overrideFunction.set( s -> s.getFlinkConfiguration() .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2")); appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + deployment.setSpec(SpecUtils.clone(specCopy)); assertEquals( ReconciliationState.UPGRADING, deployment.getStatus().getReconciliationStatus().getState()); @@ -979,6 +995,55 @@ public void scale(KubernetesJobAutoScalerContext ctx) { .getResourceContext(deployment, context) .getObserveConfig() .get(PipelineOptions.PARALLELISM_OVERRIDES)); + + // Set the job into running state (scale up completed) + appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + deployment.setSpec(SpecUtils.clone(specCopy)); + verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); + deployment.setSpec(SpecUtils.clone(specCopy)); + + // Make sure new reset nonce clears autoscaler + deployment.getSpec().getJob().setAutoscalerResetNonce(1L); + appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + deployment.setSpec(SpecUtils.clone(specCopy)); + assertEquals( + ReconciliationState.UPGRADING, + deployment.getStatus().getReconciliationStatus().getState()); + assertEquals( + Map.of(v1.toHexString(), "1"), + ctxFactory + .getResourceContext(deployment, context) + .getObserveConfig() + .get(PipelineOptions.PARALLELISM_OVERRIDES)); + assertEquals( + 1L, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getAutoscalerResetNonce()); + + appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs()); + deployment.setSpec(SpecUtils.clone(specCopy)); + + // Make sure autoscaler reset nonce properly updated even if no deployment happens + + deployment.getSpec().getJob().setAutoscalerResetNonce(2L); + appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context)); + deployment.setSpec(SpecUtils.clone(specCopy)); + assertEquals( + 2L, + deployment + .getStatus() + .getReconciliationStatus() + .deserializeLastReconciledSpec() + .getJob() + .getAutoscalerResetNonce()); + assertEquals( + ReconciliationState.DEPLOYED, + deployment.getStatus().getReconciliationStatus().getState()); } @ParameterizedTest diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml index 6042799dc5..c94c13871e 100644 --- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml @@ -84,6 +84,8 @@ spec: items: type: string type: array + autoscalerResetNonce: + type: integer checkpointTriggerNonce: type: integer entryClass: diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml index 431787c7a7..93515bf143 100644 --- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml +++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml @@ -45,6 +45,8 @@ spec: items: type: string type: array + autoscalerResetNonce: + type: integer checkpointTriggerNonce: type: integer entryClass: