Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/docs/custom-resource/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. This can be used to quickly go back to the initial user-provided parallelism settings without having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply change the nonce to a new non-null value. |

### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,13 @@ public class JobSpec implements Diffable<JobSpec> {
*/
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
private Long savepointRedeployNonce;

/**
* Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job.
* This can be used to quickly go back to the initial user-provided parallelism settings without
* having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply
* change the nonce to a new non-null value.
*/
@SpecDiff(value = DiffType.IGNORE)
private Long autoscalerResetNonce;
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,19 @@ public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshot
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
}

public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledAutoscalerResetNonce(
AbstractFlinkResource<SPEC, ?> target) {
var spec = target.getSpec();
var reconciliationStatus = target.getStatus().getReconciliationStatus();
var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();

lastReconciledSpec
.getJob()
.setAutoscalerResetNonce(spec.getJob().getAutoscalerResetNonce());
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
}

private static void updateLastReconciledJobSpec(
JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) {
switch (snapshotType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,27 @@ private Optional<String> getInitialSnapshotPath(AbstractFlinkSpec spec) {

private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
var autoScalerCtx = ctx.getJobAutoScalerContext();
var resource = ctx.getResource();
boolean autoscalerEnabled =
ctx.getResource().getSpec().getJob() != null
resource.getSpec().getJob() != null
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);

var reconStatus = resource.getStatus().getReconciliationStatus();
if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
var newResetNonce = resource.getSpec().getJob().getAutoscalerResetNonce();
// check if the nonce changed to a non-null value
if (newResetNonce != null
&& !newResetNonce.equals(
reconStatus
.deserializeLastReconciledSpec()
.getJob()
.getAutoscalerResetNonce())) {
autoscaler.cleanup(autoScalerCtx);
ReconciliationUtils.updateLastReconciledAutoscalerResetNonce(resource);
}
}

autoscaler.scale(autoScalerCtx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -57,6 +58,7 @@
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
Expand Down Expand Up @@ -947,29 +949,43 @@ public void testApplyAutoscalerParallelism() throws Exception {
public void scale(KubernetesJobAutoScalerContext ctx) {
overrideFunction.get().accept(ctx.getResource().getSpec());
}

@Override
public void cleanup(KubernetesJobAutoScalerContext ctx) {
overrideFunction.set(s -> {});
}
};
var v1 = new JobVertexID();

appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler);

var deployment = TestUtils.buildApplicationCluster();
var config = deployment.getSpec().getFlinkConfiguration();
config.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
config.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");

var specCopy = SpecUtils.clone(deployment.getSpec());

appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
deployment.setSpec(SpecUtils.clone(specCopy));

// Job running verify no upgrades if overrides are empty
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState());

// Test overrides are applied correctly
var v1 = new JobVertexID();
overrideFunction.set(
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));

appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
Expand All @@ -979,6 +995,55 @@ public void scale(KubernetesJobAutoScalerContext ctx) {
.getResourceContext(deployment, context)
.getObserveConfig()
.get(PipelineOptions.PARALLELISM_OVERRIDES));

// Set the job into running state (scale up completed)
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
deployment.setSpec(SpecUtils.clone(specCopy));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
deployment.setSpec(SpecUtils.clone(specCopy));

// Make sure new reset nonce clears autoscaler
deployment.getSpec().getJob().setAutoscalerResetNonce(1L);
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(
Map.of(v1.toHexString(), "1"),
ctxFactory
.getResourceContext(deployment, context)
.getObserveConfig()
.get(PipelineOptions.PARALLELISM_OVERRIDES));
assertEquals(
1L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getAutoscalerResetNonce());

appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
deployment.setSpec(SpecUtils.clone(specCopy));

// Make sure autoscaler reset nonce properly updated even if no deployment happens

deployment.getSpec().getJob().setAutoscalerResetNonce(2L);
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
2L,
deployment
.getStatus()
.getReconciliationStatus()
.deserializeLastReconciledSpec()
.getJob()
.getAutoscalerResetNonce());
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ spec:
items:
type: string
type: array
autoscalerResetNonce:
type: integer
checkpointTriggerNonce:
type: integer
entryClass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ spec:
items:
type: string
type: array
autoscalerResetNonce:
type: integer
checkpointTriggerNonce:
type: integer
entryClass:
Expand Down
Loading