Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,58 @@ public InlineElement getDescription() {
.key()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Boolean> SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_ENABLED =
key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
Description.builder()
.text(
"Controls whether the Adaptive Scheduler actively triggers a checkpoint when rescaling after desired resources become available. "
+ "When enabled, active triggers should respect %s and should not commence if a checkpoint is already in-flight.",
code(
CheckpointingOptions
.MIN_PAUSE_BETWEEN_CHECKPOINTS
.key()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Duration> SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_TIMEOUT =
key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Maximum time to wait for the rescale-synchronizing checkpoint to complete before falling back to waiting for periodic checkpoints.");

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Integer> SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_MAX_RETRIES =
key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.max-retries")
.intType()
.defaultValue(0)
.withDescription(
"Maximum number of retries for actively triggering a rescale-synchronizing checkpoint before falling back to waiting for periodic checkpoints.");

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Duration> SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_BACKOFF =
key("jobmanager.adaptive-scheduler.rescale.active-checkpoint.backoff")
.durationType()
.defaultValue(Duration.ofSeconds(5))
.withDescription(
"Backoff between retries when actively triggering a rescale-synchronizing checkpoint.");

/**
* @deprecated Use {@link
* JobManagerOptions#SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,15 @@ public static Settings of(
configuration.get(
SCHEDULER_RESCALE_TRIGGER_MAX_DELAY,
maximumDelayForRescaleTriggerDefault),
rescaleOnFailedCheckpointsCount);
rescaleOnFailedCheckpointsCount,
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_ENABLED),
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_TIMEOUT),
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_MAX_RETRIES),
configuration.get(
JobManagerOptions.SCHEDULER_RESCALE_ACTIVE_CHECKPOINT_BACKOFF));
}

private final SchedulerExecutionMode executionMode;
Expand All @@ -314,6 +322,10 @@ public static Settings of(
private final Duration executingResourceStabilizationTimeout;
private final Duration maximumDelayForTriggeringRescale;
private final int rescaleOnFailedCheckpointCount;
private final boolean rescaleActiveCheckpointEnabled;
private final Duration rescaleActiveCheckpointTimeout;
private final int rescaleActiveCheckpointMaxRetries;
private final Duration rescaleActiveCheckpointBackoff;

private Settings(
SchedulerExecutionMode executionMode,
Expand All @@ -323,7 +335,11 @@ private Settings(
Duration executingCooldownTimeout,
Duration executingResourceStabilizationTimeout,
Duration maximumDelayForTriggeringRescale,
int rescaleOnFailedCheckpointCount) {
int rescaleOnFailedCheckpointCount,
boolean rescaleActiveCheckpointEnabled,
Duration rescaleActiveCheckpointTimeout,
int rescaleActiveCheckpointMaxRetries,
Duration rescaleActiveCheckpointBackoff) {
this.executionMode = executionMode;
this.submissionResourceWaitTimeout = submissionResourceWaitTimeout;
this.submissionResourceStabilizationTimeout = submissionResourceStabilizationTimeout;
Expand All @@ -332,6 +348,10 @@ private Settings(
this.executingResourceStabilizationTimeout = executingResourceStabilizationTimeout;
this.maximumDelayForTriggeringRescale = maximumDelayForTriggeringRescale;
this.rescaleOnFailedCheckpointCount = rescaleOnFailedCheckpointCount;
this.rescaleActiveCheckpointEnabled = rescaleActiveCheckpointEnabled;
this.rescaleActiveCheckpointTimeout = rescaleActiveCheckpointTimeout;
this.rescaleActiveCheckpointMaxRetries = rescaleActiveCheckpointMaxRetries;
this.rescaleActiveCheckpointBackoff = rescaleActiveCheckpointBackoff;
}

public SchedulerExecutionMode getExecutionMode() {
Expand Down Expand Up @@ -365,6 +385,22 @@ public Duration getMaximumDelayForTriggeringRescale() {
public int getRescaleOnFailedCheckpointCount() {
return rescaleOnFailedCheckpointCount;
}

public boolean isRescaleActiveCheckpointEnabled() {
return rescaleActiveCheckpointEnabled;
}

public Duration getRescaleActiveCheckpointTimeout() {
return rescaleActiveCheckpointTimeout;
}

public int getRescaleActiveCheckpointMaxRetries() {
return rescaleActiveCheckpointMaxRetries;
}

public Duration getRescaleActiveCheckpointBackoff() {
return rescaleActiveCheckpointBackoff;
}
}

private final Settings settings;
Expand Down Expand Up @@ -1558,6 +1594,14 @@ public JobManagerJobMetricGroup getMetricGroup() {
return jobManagerJobMetricGroup;
}

// ----------------------------------------------------------------
// Executing.Context additions
// ----------------------------------------------------------------

public Settings getSettings() {
return settings;
}

@Override
public boolean isState(State expectedState) {
return expectedState == this.state;
Expand Down
Loading