From 23fdd6b0e004929ff30c02a5c0ccb9c7389ebda2 Mon Sep 17 00:00:00 2001 From: Samrat002 Date: Sat, 4 Oct 2025 14:42:55 +0530 Subject: [PATCH] [FLINK-36753] Adaptive Scheduler actively triggers a Checkpoint --- .../configuration/JobManagerOptions.java | 52 +++++ .../scheduler/adaptive/AdaptiveScheduler.java | 48 ++++- .../runtime/scheduler/adaptive/Executing.java | 200 +++++++++++++++++- 3 files changed, 296 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java index 85f048edd32ea..c2bf4c52b7b5e 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java @@ -665,6 +665,58 @@ public InlineElement getDescription() { .key())) .build()); + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_ENABLED = + key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.enabled") + .booleanType() + .defaultValue(false) + .withDescription( + Description.builder() + .text( + "Controls whether the Adaptive Scheduler actively triggers a checkpoint when rescaling after desired resources become available. " + + "When enabled, active triggers should respect %s and should not commence if a checkpoint is already in-flight.", + code( + CheckpointingOptions + .MIN_PAUSE_BETWEEN_CHECKPOINTS + .key())) + .build()); + + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_TIMEOUT = + key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.timeout") + .durationType() + .defaultValue(Duration.ofMinutes(5)) + .withDescription( + "Maximum time to wait for the rescale-synchronizing checkpoint to complete before falling back to waiting for periodic checkpoints."); + + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_MAX_RETRIES = + key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.max-retries") + .intType() + .defaultValue(0) + .withDescription( + "Maximum number of retries for actively triggering a rescale-synchronizing checkpoint before falling back to waiting for periodic checkpoints."); + + @Documentation.Section({ + Documentation.Sections.EXPERT_SCHEDULING, + Documentation.Sections.ALL_JOB_MANAGER + }) + public static final ConfigOption SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_BACKOFF = + key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.backoff") + .durationType() + .defaultValue(Duration.ofSeconds(5)) + .withDescription( + "Backoff between retries when actively triggering a rescale-synchronizing checkpoint."); + /** * @deprecated Use {@link * JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index d8219d595428d..94e1b587309d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -303,7 +303,15 @@ public static Settings of( configuration.get( SCHEDULER_RESCALE_TRIGGER_MAX_DELAY, maximumDelayForRescaleTriggerDefault), - rescaleOnFailedCheckpointsCount); + rescaleOnFailedCheckpointsCount, + configuration.get( + JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_ENABLED), + configuration.get( + JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_TIMEOUT), + configuration.get( + JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_MAX_RETRIES), + configuration.get( + JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_BACKOFF)); } private final SchedulerExecutionMode executionMode; @@ -314,6 +322,10 @@ public static Settings of( private final Duration executingResourceStabilizationTimeout; private final Duration maximumDelayForTriggeringRescale; private final int rescaleOnFailedCheckpointCount; + private final boolean rescaleActiveCheckpointEnabled; + private final Duration rescaleActiveCheckpointTimeout; + private final int rescaleActiveCheckpointMaxRetries; + private final Duration rescaleActiveCheckpointBackoff; private Settings( SchedulerExecutionMode executionMode, @@ -323,7 +335,11 @@ private Settings( Duration executingCooldownTimeout, Duration executingResourceStabilizationTimeout, Duration maximumDelayForTriggeringRescale, - int rescaleOnFailedCheckpointCount) { + int rescaleOnFailedCheckpointCount, + boolean rescaleActiveCheckpointEnabled, + Duration rescaleActiveCheckpointTimeout, + int rescaleActiveCheckpointMaxRetries, + Duration rescaleActiveCheckpointBackoff) { this.executionMode = executionMode; this.submissionResourceWaitTimeout = submissionResourceWaitTimeout; this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout; @@ -332,6 +348,10 @@ private Settings( this.executingResourceStabilizationTimeout = executingResourceStabilizationTimeout; this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale; this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; + this.rescaleActiveCheckpointEnabled = rescaleActiveCheckpointEnabled; + this.rescaleActiveCheckpointTimeout = rescaleActiveCheckpointTimeout; + this.rescaleActiveCheckpointMaxRetries = rescaleActiveCheckpointMaxRetries; + this.rescaleActiveCheckpointBackoff = rescaleActiveCheckpointBackoff; } public SchedulerExecutionMode getExecutionMode() { @@ -365,6 +385,22 @@ public Duration getMaximumDelayForTriggeringRescale() { public int getRescaleOnFailedCheckpointCount() { return rescaleOnFailedCheckpointCount; } + + public boolean isRescaleActiveCheckpointEnabled() { + return rescaleActiveCheckpointEnabled; + } + + public Duration getRescaleActiveCheckpointTimeout() { + return rescaleActiveCheckpointTimeout; + } + + public int getRescaleActiveCheckpointMaxRetries() { + return rescaleActiveCheckpointMaxRetries; + } + + public Duration getRescaleActiveCheckpointBackoff() { + return rescaleActiveCheckpointBackoff; + } } private final Settings settings; @@ -1558,6 +1594,14 @@ public JobManagerJobMetricGroup getMetricGroup() { return jobManagerJobMetricGroup; } + // ---------------------------------------------------------------- + // Executing.Context additions + // ---------------------------------------------------------------- + + public Settings getSettings() { + return settings; + } + @Override public boolean isState(State expectedState) { return expectedState == this.state; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java index f699d13373e46..8a9efb96c1061 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointScheduling; import org.apache.flink.runtime.checkpoint.CheckpointStatsListener; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; @@ -64,6 +65,13 @@ class Executing extends StateWithExecutionGraph // null indicates that there was no change event observed, yet @Nullable private AtomicInteger failedCheckpointCountdown; + private final boolean rescaleActiveCheckpointEnabled; + private final Duration rescaleActiveCheckpointTimeout; + private final int rescaleActiveCheckpointMaxRetries; + private final Duration rescaleActiveCheckpointBackoff; + + private boolean waitingForRescaleCheckpoint; + Executing( ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, @@ -74,7 +82,11 @@ class Executing extends StateWithExecutionGraph List failureCollection, Function stateTransitionManagerFactory, - int rescaleOnFailedCheckpointCount) { + int rescaleOnFailedCheckpointCount, + boolean rescaleActiveCheckpointEnabled, + Duration rescaleActiveCheckpointTimeout, + int rescaleActiveCheckpointMaxRetries, + Duration rescaleActiveCheckpointBackoff) { super( context, executionGraph, @@ -94,6 +106,11 @@ class Executing extends StateWithExecutionGraph "The rescaleOnFailedCheckpointCount should be larger than 0."); this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount; this.failedCheckpointCountdown = null; + this.rescaleActiveCheckpointEnabled = rescaleActiveCheckpointEnabled; + this.rescaleActiveCheckpointTimeout = rescaleActiveCheckpointTimeout; + this.rescaleActiveCheckpointMaxRetries = rescaleActiveCheckpointMaxRetries; + this.rescaleActiveCheckpointBackoff = rescaleActiveCheckpointBackoff; + this.waitingForRescaleCheckpoint = false; deploy(); @@ -107,6 +124,34 @@ class Executing extends StateWithExecutionGraph Duration.ZERO); } + // Backwards-compatible constructor for tests and existing call sites + Executing( + ExecutionGraph executionGraph, + ExecutionGraphHandler executionGraphHandler, + OperatorCoordinatorHandler operatorCoordinatorHandler, + Logger logger, + Context context, + ClassLoader userCodeClassLoader, + List failureCollection, + Function + stateTransitionManagerFactory, + int rescaleOnFailedCheckpointCount) { + this( + executionGraph, + executionGraphHandler, + operatorCoordinatorHandler, + logger, + context, + userCodeClassLoader, + failureCollection, + stateTransitionManagerFactory, + rescaleOnFailedCheckpointCount, + false, + Duration.ZERO, + 0, + Duration.ZERO); + } + @Override public boolean hasSufficientResources() { return parallelismChanged() && context.hasSufficientResources(); @@ -149,6 +194,64 @@ public ScheduledFuture scheduleOperation(Runnable callback, Duration delay) { @Override public void transitionToSubsequentState() { + if (!rescaleActiveCheckpointEnabled) { + proceedToRestartingImmediately(); + return; + } + + final ExecutionGraph executionGraph = getExecutionGraph(); + if (executionGraph.getCheckpointCoordinatorConfiguration() == null + || !executionGraph + .getCheckpointCoordinatorConfiguration() + .isCheckpointingEnabled()) { + getLogger() + .info( + "Active checkpoint triggering disabled: checkpointing not enabled. Proceeding to rescale."); + proceedToRestartingImmediately(); + return; + } + + final CheckpointCoordinator checkpointCoordinator = + executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + getLogger() + .info( + "Active checkpoint triggering disabled: no CheckpointCoordinator. Proceeding to rescale."); + proceedToRestartingImmediately(); + return; + } + + if (waitingForRescaleCheckpoint) { + getLogger() + .info( + "Already awaiting active rescale checkpoint completion. Skipping new trigger."); + return; + } + + if (checkpointCoordinator.isTriggering() + || checkpointCoordinator.getNumberOfPendingCheckpoints() > 0) { + getLogger() + .info( + "Skip active checkpoint trigger: checkpoint in flight. Waiting for completion with timeout {}.", + rescaleActiveCheckpointTimeout); + waitingForRescaleCheckpoint = true; + scheduleRescaleTimeoutFallback(); + return; + } + + waitingForRescaleCheckpoint = true; + getLogger() + .info( + "Triggering active checkpoint for rescale (respectMinPause={} timeout={} maxRetries={} backoff={}).", + true, + rescaleActiveCheckpointTimeout, + rescaleActiveCheckpointMaxRetries, + rescaleActiveCheckpointBackoff); + + attemptActiveCheckpointWithRetries(0); + } + + private void proceedToRestartingImmediately() { context.goToRestarting( getExecutionGraph(), getExecutionGraphHandler(), @@ -162,6 +265,90 @@ public void transitionToSubsequentState() { getFailures()); } + private void attemptActiveCheckpointWithRetries(int attempt) { + final ExecutionGraph executionGraph = getExecutionGraph(); + final CheckpointCoordinator checkpointCoordinator = + executionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator == null) { + getLogger().info("No CheckpointCoordinator available. Proceeding to rescale."); + waitingForRescaleCheckpoint = false; + proceedToRestartingImmediately(); + return; + } + + final java.util.concurrent.CompletableFuture future = + checkpointCoordinator.triggerCheckpoint(true); + final ScheduledFuture timeoutFuture = + context.runIfState( + this, + () -> { + if (waitingForRescaleCheckpoint) { + getLogger() + .warn( + "Active rescale checkpoint timed out after {}. Falling back to periodic checkpoint.", + rescaleActiveCheckpointTimeout); + waitingForRescaleCheckpoint = false; + stateTransitionManager.onTrigger(); + } + }, + rescaleActiveCheckpointTimeout); + + future.whenCompleteAsync( + (CompletedCheckpoint completed, Throwable error) -> { + timeoutFuture.cancel(false); + if (!waitingForRescaleCheckpoint) { + return; + } + if (error == null) { + getLogger() + .info( + "Active rescale checkpoint {} completed. Proceeding to rescale.", + completed.getCheckpointID()); + waitingForRescaleCheckpoint = false; + proceedToRestartingImmediately(); + } else { + if (attempt < rescaleActiveCheckpointMaxRetries) { + getLogger() + .warn( + "Active rescale checkpoint failed (attempt {}/{}). Retrying after {}. Reason: {}", + attempt + 1, + rescaleActiveCheckpointMaxRetries, + rescaleActiveCheckpointBackoff, + error.toString()); + context.runIfState( + this, + () -> attemptActiveCheckpointWithRetries(attempt + 1), + rescaleActiveCheckpointBackoff); + } else { + getLogger() + .warn( + "Active rescale checkpoint failed after {} attempts. Falling back to periodic checkpoint.", + rescaleActiveCheckpointMaxRetries); + waitingForRescaleCheckpoint = false; + stateTransitionManager.onTrigger(); + } + } + }, + context.getMainThreadExecutor()); + } + + private void scheduleRescaleTimeoutFallback() { + context.runIfState( + this, + () -> { + if (waitingForRescaleCheckpoint) { + getLogger() + .warn( + "Waiting for in-flight checkpoint exceeded {}. Falling back to periodic checkpoint.", + rescaleActiveCheckpointTimeout); + waitingForRescaleCheckpoint = false; + stateTransitionManager.onTrigger(); + } + }, + rescaleActiveCheckpointTimeout); + } + @Override public JobStatus getJobStatus() { return JobStatus.RUNNING; @@ -332,6 +519,11 @@ FailureResult howToHandleFailure( * @return {@code true} if we have sufficient resources; otherwise {@code false} */ boolean hasSufficientResources(); + + /** Provides scheduler settings for active checkpoint triggering. */ + default AdaptiveScheduler.Settings getSettings() { + return null; + } } static class Factory implements StateFactory { @@ -383,7 +575,11 @@ public Executing getState() { userCodeClassLoader, failureCollection, stateTransitionManagerFactory, - rescaleOnFailedCheckpointCount); + rescaleOnFailedCheckpointCount, + context.getSettings().isRescaleActiveCheckpointEnabled(), + context.getSettings().getRescaleActiveCheckpointTimeout(), + context.getSettings().getRescaleActiveCheckpointMaxRetries(), + context.getSettings().getRescaleActiveCheckpointBackoff()); } } }