Skip to content

Commit fa86de5

Browse files
committed
Change a logic according to a new decision
1 parent ba27c5c commit fa86de5

File tree

9 files changed

+286
-80
lines changed

9 files changed

+286
-80
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
176176
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
177177
SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
178178
if (existingSpec != null) {
179-
spec.mergeSpecConfigs(existingSpec);
179+
spec.merge(existingSpec);
180180
}
181181
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
182182
return shouldUpdateSpec;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount)
611611
log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?");
612612
return;
613613
}
614-
autoScalerConfig.setTaskCountStart(desiredActiveTaskCount);
614+
ioConfig.setTaskCount(desiredActiveTaskCount);
615615
try {
616616
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
617617
if (supervisorManager.isPresent()) {
@@ -1179,7 +1179,7 @@ public void stop(boolean stopGracefully)
11791179
catch (Exception e) {
11801180
stateManager.recordThrowableEvent(e);
11811181
log.makeAlert(e, "Exception stopping [%s]", supervisorId)
1182-
.emit();
1182+
.emit();
11831183
}
11841184
}
11851185
}
@@ -1357,7 +1357,7 @@ public void tryInit()
13571357
}
13581358
initRetryCounter++;
13591359
log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", supervisorId)
1360-
.emit();
1360+
.emit();
13611361

13621362
throw new RuntimeException(e);
13631363
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -192,11 +192,9 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()
192192
@Override
193193
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
194194
{
195-
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
196-
if (autoScalerConfig != null
197-
&& autoScalerConfig.getEnableTaskAutoScaler()
198-
&& supervisor instanceof SeekableStreamSupervisor) {
199-
return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
195+
AutoScalerConfig autoScalerCfg = ingestionSchema.getIOConfig().getAutoScalerConfig();
196+
if (autoScalerCfg != null && autoScalerCfg.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
197+
return autoScalerCfg.createAutoScaler(supervisor, this, emitter);
200198
}
201199
return new NoopTaskAutoScaler();
202200
}
@@ -263,29 +261,30 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
263261
}
264262
}
265263

266-
/**
267-
* Writes the <code>taskCountStart</code> value from old config, if not specificed in new config.
268-
*
269-
* @param existingSpec the existing supervisor specification to merge configuration values from
270-
*/
271264
@Override
272-
public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec)
265+
public void merge(@NotNull SupervisorSpec existingSpec)
273266
{
274267
AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig();
275268
// Either if autoscaler is absent or taskCountStart is specified - just return.
276269
if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) {
277270
return;
278271
}
279272

280-
// TODO[sasha]: use switch expression with pattern matching when we move to Java 21 as minimum requirement.
273+
// Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement.
281274
if (existingSpec instanceof SeekableStreamSupervisorSpec) {
282-
// Note: for some reason, sources are available only for bytecode version 11.
283-
//noinspection PatternVariableCanBeUsed
284-
var spec = (SeekableStreamSupervisorSpec) existingSpec;
275+
SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec;
285276
AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig();
286-
if (autoScalerConfig != null && autoScalerConfig.getTaskCountStart() != null) {
287-
thisAutoScalerConfig.setTaskCountStart(autoScalerConfig.getTaskCountStart());
277+
if (autoScalerConfig == null) {
278+
return;
279+
}
280+
// provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`.
281+
int taskCount = thisAutoScalerConfig.getTaskCountMin();
282+
if (this.getIoConfig().getTaskCount() != null) {
283+
taskCount = this.getIoConfig().getTaskCount();
284+
} else if (spec.getIoConfig().getTaskCount() != null) {
285+
taskCount = spec.getIoConfig().getTaskCount();
288286
}
287+
this.getIoConfig().setTaskCount(taskCount);
289288
}
290289
}
291290

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public interface AutoScalerConfig
4040
int getTaskCountMax();
4141
int getTaskCountMin();
4242
Integer getTaskCountStart();
43-
void setTaskCountStart(int taskCountStart);
4443
Double getStopTaskCountRatio();
4544
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter);
4645
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,10 @@ public void start()
124124
);
125125
log.info(
126126
"LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]",
127-
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(),
128-
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
127+
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
128+
lagMetricsQueue.maxSize(),
129+
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
130+
dataSource
129131
);
130132
}
131133

@@ -190,27 +192,27 @@ private Runnable computeAndCollectLag()
190192
};
191193
}
192194

193-
/**
194-
* This method determines whether to do scale actions based on collected lag points.
195-
* The current algorithm of scale is straightforward:
196-
* <ul>
197-
* <li>First, compute the proportion of lag points higher/lower than <code>scaleOutThreshold</code>/<code>scaleInThreshold</code>,
198-
* getting <code>scaleOutThreshold</code>/<code>scaleInThreshold</code>.
199-
* <li>Secondly, compare <code>scaleOutThreshold</code>/<code>scaleInThreshold</code> with
200-
* <code>triggerScaleOutFractionThreshold</code>/<code>triggerScaleInFractionThreshold</code>.
201-
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
202-
* <li>Finally, if <code>scaleOutThreshold</code>/<code>scaleInThreshold</code> is higher than
203-
* <code>triggerScaleOutFractionThreshold</code>/<code>triggerScaleInFractionThreshold</code>, scale out/in action would be triggered.
204-
* </ul>
205-
*
206-
* @param lags the lag metrics of Stream (Kafka/Kinesis)
207-
* @return Integer, target number of tasksCount. -1 means skip scale action.
208-
*/
195+
/**
196+
* This method determines whether to do scale actions based on collected lag points.
197+
* The current algorithm of scale is straightforward:
198+
* <ul>
199+
* <li>First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold},
200+
* getting {@code scaleInThreshold/scaleOutThreshold},.
201+
* <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
202+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
203+
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
204+
* <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
205+
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered.
206+
* </ul>
207+
*
208+
* @param lags the lag metrics of Stream (Kafka/Kinesis)
209+
* @return Integer, target number of tasksCount. -1 means skip scale action.
210+
*/
209211
private int computeDesiredTaskCount(List<Long> lags)
210212
{
211-
// if supervisor is not suspended, ensure required tasks are running
213+
// if the supervisor is not suspended, ensure required tasks are running
212214
// if suspended, ensure tasks have been requested to gracefully stop
213-
log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
215+
log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags);
214216
int beyond = 0;
215217
int within = 0;
216218
int metricsCount = lags.size();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ public LagBasedAutoScalerConfig(
103103

104104
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
105105
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
106-
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
107-
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
106+
this.minTriggerScaleActionFrequencyMillis =
107+
minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000;
108108

109109
Preconditions.checkArgument(
110110
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
@@ -183,12 +183,6 @@ public Integer getTaskCountStart()
183183
return taskCountStart;
184184
}
185185

186-
@Override
187-
public void setTaskCountStart(int taskCountStart)
188-
{
189-
this.taskCountStart = taskCountStart;
190-
}
191-
192186
@Override
193187
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
194188
{

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc
784784
int taskCountBeforeScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin();
785785
Assert.assertEquals(1, taskCountBeforeScaleOut);
786786
Thread.sleep(1000);
787-
int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart();
787+
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
788788
Assert.assertEquals(2, taskCountAfterScaleOut);
789789
Assert.assertTrue(
790790
dynamicActionEmitter
@@ -1014,7 +1014,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t
10141014
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
10151015
Thread.sleep(1000);
10161016

1017-
int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart();
1017+
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
10181018
Assert.assertEquals(2, taskCountAfterScaleOut);
10191019
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1);
10201020

@@ -1066,7 +1066,7 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce
10661066
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
10671067

10681068
Thread.sleep(1000);
1069-
int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart();
1069+
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
10701070
Assert.assertEquals(1, taskCountAfterScaleOut);
10711071
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1);
10721072

@@ -1126,7 +1126,7 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti
11261126

11271127
Thread.sleep(2000);
11281128
// Then
1129-
Assert.assertEquals(10, (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
1129+
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());
11301130

11311131
emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1);
11321132

@@ -1605,17 +1605,17 @@ public void testMergeSpecConfigs()
16051605
mockIngestionSchema();
16061606

16071607
// Given
1608-
// Create existing spec with autoscaler config that has taskCountStart set to 5
1608+
// Create existing spec with autoscaler config and taskCount set to 5
16091609
HashMap<String, Object> existingAutoScalerConfig = new HashMap<>();
16101610
existingAutoScalerConfig.put("enableTaskAutoScaler", true);
16111611
existingAutoScalerConfig.put("taskCountMax", 8);
16121612
existingAutoScalerConfig.put("taskCountMin", 1);
1613-
existingAutoScalerConfig.put("taskCountStart", 5);
16141613

16151614
SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
16161615
EasyMock.expect(existingIoConfig.getAutoScalerConfig())
16171616
.andReturn(mapper.convertValue(existingAutoScalerConfig, AutoScalerConfig.class))
16181617
.anyTimes();
1618+
EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes();
16191619
EasyMock.replay(existingIoConfig);
16201620

16211621
SeekableStreamSupervisorIngestionSpec existingIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
@@ -1631,7 +1631,7 @@ public void testMergeSpecConfigs()
16311631
existingIngestionSchema
16321632
);
16331633

1634-
// Create new spec with autoscaler config that has taskCountStart not set (null)
1634+
// Create new spec with autoscaler config that has taskCountStart not set (null) and no taskCount set
16351635
HashMap<String, Object> newAutoScalerConfig = new HashMap<>();
16361636
newAutoScalerConfig.put("enableTaskAutoScaler", true);
16371637
newAutoScalerConfig.put("taskCountMax", 8);
@@ -1641,6 +1641,9 @@ public void testMergeSpecConfigs()
16411641
EasyMock.expect(newIoConfig.getAutoScalerConfig())
16421642
.andReturn(mapper.convertValue(newAutoScalerConfig, AutoScalerConfig.class))
16431643
.anyTimes();
1644+
EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes();
1645+
newIoConfig.setTaskCount(5);
1646+
EasyMock.expectLastCall().once();
16441647
EasyMock.replay(newIoConfig);
16451648

16461649
SeekableStreamSupervisorIngestionSpec newIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
@@ -1654,15 +1657,14 @@ public void testMergeSpecConfigs()
16541657
newIngestionSchema
16551658
);
16561659

1657-
16581660
// Before merge, taskCountStart should be null
16591661
Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
16601662

1661-
// When
1662-
newSpec.mergeSpecConfigs(existingSpec);
1663+
// When - merge should copy taskCount from existing spec since new spec has no taskCount
1664+
newSpec.merge(existingSpec);
16631665

1664-
// Then - taskCountStart should be copied from existing spec
1665-
Assert.assertEquals(Integer.valueOf(5), newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart());
1666+
// Then - verify setTaskCount was called (EasyMock will verify the mock expectations)
1667+
EasyMock.verify(newIoConfig);
16661668
}
16671669

16681670
private TestSeekableStreamSupervisorSpec buildDefaultSupervisorSpecWithIngestionSchema(

0 commit comments

Comments
 (0)