Skip to content

Commit 26b82e7

Browse files
committed
Change a logic according to a new decision
1 parent ba27c5c commit 26b82e7

File tree

9 files changed

+274
-65
lines changed

9 files changed

+274
-65
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
176176
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
177177
SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
178178
if (existingSpec != null) {
179-
spec.mergeSpecConfigs(existingSpec);
179+
spec.merge(existingSpec);
180180
}
181181
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
182182
return shouldUpdateSpec;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,8 @@ private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount)
611611
log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?");
612612
return;
613613
}
614-
autoScalerConfig.setTaskCountStart(desiredActiveTaskCount);
614+
ioConfig.setTaskCount(desiredActiveTaskCount);
615+
// TODO: autoScalerConfig.setTaskCountStart(desiredActiveTaskCount);
615616
try {
616617
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
617618
if (supervisorManager.isPresent()) {
@@ -1179,7 +1180,7 @@ public void stop(boolean stopGracefully)
11791180
catch (Exception e) {
11801181
stateManager.recordThrowableEvent(e);
11811182
log.makeAlert(e, "Exception stopping [%s]", supervisorId)
1182-
.emit();
1183+
.emit();
11831184
}
11841185
}
11851186
}
@@ -1357,7 +1358,7 @@ public void tryInit()
13571358
}
13581359
initRetryCounter++;
13591360
log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", supervisorId)
1360-
.emit();
1361+
.emit();
13611362

13621363
throw new RuntimeException(e);
13631364
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,29 +263,30 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
263263
}
264264
}
265265

266-
/**
267-
* Writes the <code>taskCountStart</code> value from old config, if not specificed in new config.
268-
*
269-
* @param existingSpec the existing supervisor specification to merge configuration values from
270-
*/
271266
@Override
272-
public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
267+
public void merge(@NotNull SupervisorSpec existingSpec)
273268
{
274269
AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig();
275270
// Either if autoscaler is absent or taskCountStart is specified - just return.
276271
if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) {
277272
return;
278273
}
279274

280-
// TODO[sasha]: use switch expression with pattern matching when we move to Java 21 as minimum requirement.
275+
// Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement.
281276
if (existingSpec instanceof SeekableStreamSupervisorSpec) {
282-
// Note: for some reason, sources are available only for bytecode version 11.
283-
//noinspection PatternVariableCanBeUsed
284-
var spec = (SeekableStreamSupervisorSpec) existingSpec;
277+
SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec;
285278
AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig();
286-
if (autoScalerConfig != null && autoScalerConfig.getTaskCountStart() != null) {
287-
thisAutoScalerConfig.setTaskCountStart(autoScalerConfig.getTaskCountStart());
279+
if (autoScalerConfig == null) {
280+
return;
281+
}
282+
// provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`.
283+
int taskCount = thisAutoScalerConfig.getTaskCountMin();
284+
if (this.getIoConfig().getTaskCount() != null) {
285+
taskCount = this.getIoConfig().getTaskCount();
286+
} else if (spec.getIoConfig().getTaskCount() != null) {
287+
taskCount = spec.getIoConfig().getTaskCount();
288288
}
289+
this.getIoConfig().setTaskCount(taskCount);
289290
}
290291
}
291292

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public interface AutoScalerConfig
4040
int getTaskCountMax();
4141
int getTaskCountMin();
4242
Integer getTaskCountStart();
43-
void setTaskCountStart(int taskCountStart);
4443
Double getStopTaskCountRatio();
4544
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter);
4645
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ public LagBasedAutoScaler(
6969
.getLagCollectionIntervalMillis()) + 1;
7070
this.lagMetricsQueue = new CircularFifoQueue<>(slots);
7171
this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Allocation-%d");
72-
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
72+
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId)
73+
+ "-Computation-%d");
7374
this.spec = spec;
7475
this.supervisor = supervisor;
7576
this.emitter = emitter;
@@ -124,8 +125,10 @@ public void start()
124125
);
125126
log.info(
126127
"LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]",
127-
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(),
128-
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
128+
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
129+
lagMetricsQueue.maxSize(),
130+
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
131+
dataSource
129132
);
130133
}
131134

@@ -190,27 +193,27 @@ private Runnable computeAndCollectLag()
190193
};
191194
}
192195

193-
/**
194-
* This method determines whether to do scale actions based on collected lag points.
195-
* The current algorithm of scale is straightforward:
196-
* <ul>
197-
* <li>First, compute the proportion of lag points higher/lower than <code>scaleOutThreshold</code>/<code>scaleInThreshold</code>,
198-
* getting <code>scaleOutThreshold</code>/<code>scaleInThreshold</code>.
199-
* <li>Secondly, compare <code>scaleOutThreshold</code>/<code>scaleInThreshold</code> with
200-
* <code>triggerScaleOutFractionThreshold</code>/<code>triggerScaleInFractionThreshold</code>.
201-
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
202-
* <li>Finally, if <code>scaleOutThreshold</code>/<code>scaleInThreshold</code> is higher than
203-
* <code>triggerScaleOutFractionThreshold</code>/<code>triggerScaleInFractionThreshold</code>, scale out/in action would be triggered.
204-
* </ul>
205-
*
206-
* @param lags the lag metrics of Stream (Kafka/Kinesis)
207-
* @return Integer, target number of tasksCount. -1 means skip scale action.
208-
*/
196+
/**
197+
* This method determines whether to do scale actions based on collected lag points.
198+
* The current algorithm of scale is straightforward:
199+
* <ul>
200+
* <li>First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold},
201+
* getting {@code scaleInThreshold/scaleOutThreshold},.
202+
* <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
203+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
204+
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
205+
* <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
206+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered.
207+
* </ul>
208+
*
209+
* @param lags the lag metrics of Stream (Kafka/Kinesis)
210+
* @return Integer, target number of tasksCount. -1 means skip scale action.
211+
*/
209212
private int computeDesiredTaskCount(List<Long> lags)
210213
{
211-
// if supervisor is not suspended, ensure required tasks are running
214+
// if the supervisor is not suspended, ensure required tasks are running
212215
// if suspended, ensure tasks have been requested to gracefully stop
213-
log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
216+
log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags);
214217
int beyond = 0;
215218
int within = 0;
216219
int metricsCount = lags.size();
@@ -264,7 +267,8 @@ private int computeDesiredTaskCount(List<Long> lags)
264267
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
265268
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
266269
if (currentActiveTaskCount == actualTaskCountMin) {
267-
log.debug("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource[%s].",
270+
log.debug(
271+
"CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource[%s].",
268272
dataSource
269273
);
270274
emitter.emit(metricBuilder

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public LagBasedAutoScalerConfig(
104104
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
105105
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
106106
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
107-
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
107+
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
108108

109109
Preconditions.checkArgument(
110110
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
@@ -183,12 +183,6 @@ public Integer getTaskCountStart()
183183
return taskCountStart;
184184
}
185185

186-
@Override
187-
public void setTaskCountStart(int taskCountStart)
188-
{
189-
this.taskCountStart = taskCountStart;
190-
}
191-
192186
@Override
193187
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
194188
{

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1659,7 +1659,7 @@ public void testMergeSpecConfigs()
16591659
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
16601660

16611661
// When
1662-
newSpec.mergeSpecConfigs(existingSpec);
1662+
newSpec.merge(existingSpec);
16631663

16641664
// Then - taskCountStart should be copied from existing spec
16651665
Assert.assertEquals(Integer.valueOf(5), newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());

0 commit comments

Comments
 (0)