Skip to content

Commit 4aedee9

Browse files
authored
Improve lag-based autoscaler config persistence (#18745)
Changes: * Add `Supervisor.merge()` to merge task count from existing running supervisor * Fix up priority of `taskCount` vs `taskCountStart` vs `taskCountMin`
1 parent eaa5dc3 commit 4aedee9

File tree

8 files changed

+404
-60
lines changed

8 files changed

+404
-60
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,10 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
174174
synchronized (lock) {
175175
Preconditions.checkState(started, "SupervisorManager not started");
176176
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
177-
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
177+
SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
178+
if (existingSpec != null) {
179+
spec.merge(existingSpec);
180+
}
178181
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
179182
return shouldUpdateSpec;
180183
}
@@ -183,6 +186,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
183186
/**
184187
* Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list.
185188
* This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted
189+
*
186190
* @param spec The spec submitted
187191
* @return boolean - true only if the spec has been modified, false otherwise
188192
*/
@@ -221,7 +225,7 @@ public boolean stopAndRemoveSupervisor(String id)
221225

222226
synchronized (lock) {
223227
Preconditions.checkState(started, "SupervisorManager not started");
224-
return possiblyStopAndRemoveSupervisorInternal(id, true);
228+
return possiblyStopAndRemoveSupervisorInternal(id, true) != null;
225229
}
226230
}
227231

@@ -299,7 +303,8 @@ public void stop()
299303
log.info("SupervisorManager stopped.");
300304
}
301305

302-
public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, @Nullable Integer limit) throws IllegalArgumentException
306+
public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, @Nullable Integer limit)
307+
throws IllegalArgumentException
303308
{
304309
return metadataSupervisorManager.getAllForId(id, limit);
305310
}
@@ -429,13 +434,14 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
429434
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
430435
* starting, stopping, suspending and resuming supervisors.
431436
*
432-
* @return true if a supervisor was stopped, false if there was no supervisor with this id
437+
* @return reference to existing supervisor, if exists and was stopped, null if there was no supervisor with this id
433438
*/
434-
private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone)
439+
@Nullable
440+
private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone)
435441
{
436442
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
437-
if (pair == null) {
438-
return false;
443+
if (pair == null || pair.rhs == null || pair.lhs == null) {
444+
return null;
439445
}
440446

441447
if (writeTombstone) {
@@ -447,13 +453,13 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write
447453
pair.lhs.stop(true);
448454
supervisors.remove(id);
449455

450-
SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
451-
if (autoscler != null) {
452-
autoscler.stop();
456+
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
457+
if (autoscaler != null) {
458+
autoscaler.stop();
453459
autoscalers.remove(id);
454460
}
455461

456-
return true;
462+
return pair.rhs;
457463
}
458464

459465
/**

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@
135135
import java.util.stream.Stream;
136136

137137
/**
138-
* this class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop
139-
* logic are similar enough so they're grouped together into this class.
138+
* This class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop
139+
* logic is similar enough, so they're grouped together into this class.
140140
* <p>
141141
* Supervisor responsible for managing the SeekableStreamIndexTasks (Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a
142142
* {@link SeekableStreamSupervisorSpec} which includes the stream name (topic / stream) and configuration as well as an ingestion spec which will
@@ -541,10 +541,20 @@ public String getType()
541541

542542
/**
543543
* This method determines how to do scale actions based on collected lag points.
544-
* If scale action is triggered :
545-
* First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
546-
* Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
547-
* Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
544+
* If scale action is triggered:
545+
* <ul>
546+
* <li>First, call <code>gracefulShutdownInternal()</code> which will change the state of current datasource ingest tasks from reading to publishing.
547+
* <li>Secondly, clear all the stateful data structures:
548+
* <ul>
549+
* <li><code>activelyReadingTaskGroups</code>,
550+
* <li><code>partitionGroups</code>,
551+
* <li><code>partitionOffsets</code>,
552+
* <li><code>pendingCompletionTaskGroups</code>,
553+
* <li><code>partitionIds</code>.
554+
* </ul>
555+
* These structures will be rebuiled in the next 'RunNotice'.
556+
* <li>Finally, change the <code>taskCount</code> in <code>SeekableStreamSupervisorIOConfig</code> and sync it to <code>MetadataStorage</code>.
557+
* </ul>
548558
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
549559
*
550560
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
@@ -916,7 +926,7 @@ public String getType()
916926
private volatile boolean lifecycleStarted = false;
917927
private final ServiceEmitter emitter;
918928

919-
// snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
929+
// snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check
920930
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
921931
private long lastActiveTimeMillis;
922932
private final IdleConfig idleConfig;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,18 @@
4343
import org.apache.druid.segment.indexing.DataSchema;
4444

4545
import javax.annotation.Nullable;
46+
import javax.validation.constraints.NotNull;
4647
import java.util.List;
4748
import java.util.Map;
4849

4950
public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
5051
{
51-
protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor."
52-
+ "%nTo perform the update safely, follow these steps:"
53-
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
54-
+ "%n(2) Create a new supervisor with the new input source stream."
55-
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.";
52+
protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
53+
"Update of the input source stream from [%s] to [%s] is not supported for a running supervisor."
54+
+ "%nTo perform the update safely, follow these steps:"
55+
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
56+
+ "%n(2) Create a new supervisor with the new input source stream."
57+
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.";
5658

5759
private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
5860
SeekableStreamSupervisorIngestionSpec ingestionSchema
@@ -183,6 +185,7 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()
183185

184186
/**
185187
* An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned.
188+
*
186189
* @param supervisor
187190
* @return autoScaler
188191
*/
@@ -232,6 +235,7 @@ public boolean isSuspended()
232235
* <li>You cannot migrate between types of supervisors.</li>
233236
* <li>You cannot change the input source stream of a running supervisor.</li>
234237
* </ul>
238+
*
235239
* @param proposedSpec the proposed supervisor spec
236240
* @throws DruidException if the proposed spec update is not allowed
237241
*/
@@ -240,7 +244,9 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
240244
{
241245
if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
242246
throw InvalidInput.exception(
243-
"Cannot update supervisor spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
247+
"Cannot update supervisor spec from type[%s] to type[%s]",
248+
getClass().getSimpleName(),
249+
proposedSpec.getClass().getSimpleName()
244250
);
245251
}
246252
SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec;
@@ -255,6 +261,33 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
255261
}
256262
}
257263

264+
@Override
265+
public void merge(@NotNull SupervisorSpec existingSpec)
266+
{
267+
AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig();
268+
// Either if autoscaler is absent or taskCountStart is specified - just return.
269+
if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) {
270+
return;
271+
}
272+
273+
// Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement.
274+
if (existingSpec instanceof SeekableStreamSupervisorSpec) {
275+
SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec;
276+
AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig();
277+
if (autoScalerConfig == null) {
278+
return;
279+
}
280+
// provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`.
281+
int taskCount = thisAutoScalerConfig.getTaskCountMin();
282+
if (this.getIoConfig().getTaskCount() != null) {
283+
taskCount = this.getIoConfig().getTaskCount();
284+
} else if (spec.getIoConfig().getTaskCount() != null) {
285+
taskCount = spec.getIoConfig().getTaskCount();
286+
}
287+
this.getIoConfig().setTaskCount(taskCount);
288+
}
289+
}
290+
258291
protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);
259292

260293
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,10 @@ public void start()
124124
);
125125
log.info(
126126
"LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]",
127-
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(),
128-
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
127+
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
128+
lagMetricsQueue.maxSize(),
129+
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
130+
dataSource
129131
);
130132
}
131133

@@ -192,19 +194,25 @@ private Runnable computeAndCollectLag()
192194

193195
/**
194196
* This method determines whether to do scale actions based on collected lag points.
195-
* Current algorithm of scale is simple:
196-
* First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
197-
* Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action.
198-
* Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered.
197+
* The current algorithm of scale is straightforward:
198+
* <ul>
199+
* <li>First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold},
200+
* getting {@code scaleInThreshold/scaleOutThreshold},.
201+
* <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
202+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
203+
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
204+
* <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
205+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered.
206+
* </ul>
199207
*
200-
* @param lags the lag metrics of Stream(Kafka/Kinesis)
201-
* @return Integer. target number of tasksCount, -1 means skip scale action.
208+
* @param lags the lag metrics of Stream (Kafka/Kinesis)
209+
* @return Integer, target number of tasksCount. -1 means skip scale action.
202210
*/
203211
private int computeDesiredTaskCount(List<Long> lags)
204212
{
205-
// if supervisor is not suspended, ensure required tasks are running
213+
// if the supervisor is not suspended, ensure required tasks are running
206214
// if suspended, ensure tasks have been requested to gracefully stop
207-
log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
215+
log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags);
208216
int beyond = 0;
209217
int within = 0;
210218
int metricsCount = lags.size();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ public LagBasedAutoScalerConfig(
103103

104104
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
105105
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
106-
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
107-
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
106+
this.minTriggerScaleActionFrequencyMillis =
107+
minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000;
108108

109109
Preconditions.checkArgument(
110110
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),

0 commit comments

Comments
 (0)