Skip to content

Commit e7ce6df

Browse files
committed
[FLINK-36640] Add autoscalerResetNonce to jobSpec
1 parent 7a9bff7 commit e7ce6df

File tree

7 files changed

+110
-2
lines changed

7 files changed

+110
-2
lines changed

docs/content/docs/custom-resource/reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ This serves as a full reference for FlinkDeployment and FlinkSessionJob custom r
182182
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
183183
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
184184
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
185+
| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. This can be used to quickly go back to the initial user-provided parallelism settings without having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply change the nonce to a new non-null value. |
185186

186187
### JobState
187188
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState

flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,13 @@ public class JobSpec implements Diffable<JobSpec> {
9797
*/
9898
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
9999
private Long savepointRedeployNonce;
100+
101+
/**
102+
* Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job.
103+
* This can be used to quickly go back to the initial user-provided parallelism settings without
104+
* having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply
105+
* change the nonce to a new non-null value.
106+
*/
107+
@SpecDiff(value = DiffType.IGNORE)
108+
private Long autoscalerResetNonce;
100109
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,19 @@ public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshot
211211
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
212212
}
213213

214+
public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledAutoscalerResetNonce(
215+
AbstractFlinkResource<SPEC, ?> target) {
216+
var spec = target.getSpec();
217+
var reconciliationStatus = target.getStatus().getReconciliationStatus();
218+
var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
219+
220+
lastReconciledSpec
221+
.getJob()
222+
.setAutoscalerResetNonce(spec.getJob().getAutoscalerResetNonce());
223+
reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
224+
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
225+
}
226+
214227
private static void updateLastReconciledJobSpec(
215228
JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) {
216229
switch (snapshotType) {

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,11 +185,27 @@ private Optional<String> getInitialSnapshotPath(AbstractFlinkSpec spec) {
185185

186186
private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
187187
var autoScalerCtx = ctx.getJobAutoScalerContext();
188+
var resource = ctx.getResource();
188189
boolean autoscalerEnabled =
189-
ctx.getResource().getSpec().getJob() != null
190+
resource.getSpec().getJob() != null
190191
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
191192
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
192193

194+
var reconStatus = resource.getStatus().getReconciliationStatus();
195+
if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
196+
var newResetNonce = resource.getSpec().getJob().getAutoscalerResetNonce();
197+
// check if the nonce changed to a non-null value
198+
if (newResetNonce != null
199+
&& !newResetNonce.equals(
200+
reconStatus
201+
.deserializeLastReconciledSpec()
202+
.getJob()
203+
.getAutoscalerResetNonce())) {
204+
autoscaler.cleanup(autoScalerCtx);
205+
ReconciliationUtils.updateLastReconciledAutoscalerResetNonce(resource);
206+
}
207+
}
208+
193209
autoscaler.scale(autoScalerCtx);
194210
}
195211

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.api.java.tuple.Tuple3;
2222
import org.apache.flink.autoscaler.JobAutoScaler;
2323
import org.apache.flink.autoscaler.NoopJobAutoscaler;
24+
import org.apache.flink.autoscaler.config.AutoScalerOptions;
2425
import org.apache.flink.client.program.rest.RestClusterClient;
2526
import org.apache.flink.configuration.ConfigOption;
2627
import org.apache.flink.configuration.Configuration;
@@ -57,6 +58,7 @@
5758
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
5859
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
5960
import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
61+
import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
6062
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
6163
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
6264
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -947,29 +949,43 @@ public void testApplyAutoscalerParallelism() throws Exception {
947949
public void scale(KubernetesJobAutoScalerContext ctx) {
948950
overrideFunction.get().accept(ctx.getResource().getSpec());
949951
}
952+
953+
@Override
954+
public void cleanup(KubernetesJobAutoScalerContext ctx) {
955+
overrideFunction.set(s -> {});
956+
}
950957
};
958+
var v1 = new JobVertexID();
951959

952960
appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler);
953961

954962
var deployment = TestUtils.buildApplicationCluster();
963+
var config = deployment.getSpec().getFlinkConfiguration();
964+
config.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
965+
config.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");
966+
967+
var specCopy = SpecUtils.clone(deployment.getSpec());
968+
955969
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
956970
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
971+
deployment.setSpec(SpecUtils.clone(specCopy));
957972

958973
// Job running verify no upgrades if overrides are empty
959974
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
975+
deployment.setSpec(SpecUtils.clone(specCopy));
960976
assertEquals(
961977
ReconciliationState.DEPLOYED,
962978
deployment.getStatus().getReconciliationStatus().getState());
963979
assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState());
964980

965981
// Test overrides are applied correctly
966-
var v1 = new JobVertexID();
967982
overrideFunction.set(
968983
s ->
969984
s.getFlinkConfiguration()
970985
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));
971986

972987
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
988+
deployment.setSpec(SpecUtils.clone(specCopy));
973989
assertEquals(
974990
ReconciliationState.UPGRADING,
975991
deployment.getStatus().getReconciliationStatus().getState());
@@ -979,6 +995,55 @@ public void scale(KubernetesJobAutoScalerContext ctx) {
979995
.getResourceContext(deployment, context)
980996
.getObserveConfig()
981997
.get(PipelineOptions.PARALLELISM_OVERRIDES));
998+
999+
// Set the job into running state (scale up completed)
1000+
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
1001+
deployment.setSpec(SpecUtils.clone(specCopy));
1002+
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
1003+
deployment.setSpec(SpecUtils.clone(specCopy));
1004+
1005+
// Make sure new reset nonce clears autoscaler
1006+
deployment.getSpec().getJob().setAutoscalerResetNonce(1L);
1007+
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
1008+
deployment.setSpec(SpecUtils.clone(specCopy));
1009+
assertEquals(
1010+
ReconciliationState.UPGRADING,
1011+
deployment.getStatus().getReconciliationStatus().getState());
1012+
assertEquals(
1013+
Map.of(v1.toHexString(), "1"),
1014+
ctxFactory
1015+
.getResourceContext(deployment, context)
1016+
.getObserveConfig()
1017+
.get(PipelineOptions.PARALLELISM_OVERRIDES));
1018+
assertEquals(
1019+
1L,
1020+
deployment
1021+
.getStatus()
1022+
.getReconciliationStatus()
1023+
.deserializeLastReconciledSpec()
1024+
.getJob()
1025+
.getAutoscalerResetNonce());
1026+
1027+
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
1028+
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
1029+
deployment.setSpec(SpecUtils.clone(specCopy));
1030+
1031+
// Make sure autoscaler reset nonce properly updated even if no deployment happens
1032+
1033+
deployment.getSpec().getJob().setAutoscalerResetNonce(2L);
1034+
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
1035+
deployment.setSpec(SpecUtils.clone(specCopy));
1036+
assertEquals(
1037+
2L,
1038+
deployment
1039+
.getStatus()
1040+
.getReconciliationStatus()
1041+
.deserializeLastReconciledSpec()
1042+
.getJob()
1043+
.getAutoscalerResetNonce());
1044+
assertEquals(
1045+
ReconciliationState.DEPLOYED,
1046+
deployment.getStatus().getReconciliationStatus().getState());
9821047
}
9831048

9841049
@ParameterizedTest

helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ spec:
8484
items:
8585
type: string
8686
type: array
87+
autoscalerResetNonce:
88+
type: integer
8789
checkpointTriggerNonce:
8890
type: integer
8991
entryClass:

helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ spec:
4545
items:
4646
type: string
4747
type: array
48+
autoscalerResetNonce:
49+
type: integer
4850
checkpointTriggerNonce:
4951
type: integer
5052
entryClass:

0 commit comments

Comments
 (0)