Skip to content

Commit 426d296

Browse files
committed
Tests fixes and cleanups
1 parent 9679527 commit 426d296

File tree

6 files changed

+439
-579
lines changed

6 files changed

+439
-579
lines changed

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
389389

390390
supervisor.start();
391391
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
392-
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
392+
Assert.assertEquals(1, taskCountBeforeScale);
393393
autoscaler.start();
394394
supervisor.runInternal();
395395
Thread.sleep(1000);

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,12 +355,13 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
355355
replayAll();
356356

357357
supervisor.start();
358-
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
358+
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
359+
Assert.assertEquals(1, taskCountBeforeScale);
359360
autoscaler.start();
360361

361362
supervisor.runInternal();
362363
verifyAll();
363-
Thread.sleep(1000);
364+
Thread.sleep(1 * 1000);
364365
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
365366
Assert.assertEquals(2, taskCountAfterScale);
366367
autoscaler.stop();
@@ -432,7 +433,8 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception
432433
supervisor.getIoConfig().setTaskCount(2);
433434

434435
supervisor.start();
435-
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart());
436+
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
437+
Assert.assertEquals(2, taskCountBeforeScale);
436438
autoscaler.start();
437439

438440
supervisor.runInternal();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -576,41 +576,37 @@ private boolean changeTaskCount(int desiredActiveTaskCount)
576576

577577
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {
578578
return false;
579+
} else {
580+
log.info(
581+
"Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].",
582+
currentActiveTaskCount,
583+
desiredActiveTaskCount,
584+
supervisorId,
585+
dataSource
586+
);
587+
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
588+
gracefulShutdownInternal();
589+
changeTaskCountInIOConfig(desiredActiveTaskCount);
590+
clearAllocationInfo();
591+
emitter.emit(ServiceMetricEvent.builder()
592+
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
593+
.setDimension(DruidMetrics.DATASOURCE, dataSource)
594+
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
595+
.setDimensionIfNotNull(
596+
DruidMetrics.TAGS,
597+
spec.getContextValue(DruidMetrics.TAGS)
598+
)
599+
.setMetric(
600+
AUTOSCALER_SCALING_TIME_METRIC,
601+
scaleActionStopwatch.millisElapsed()
602+
));
603+
log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
604+
return true;
579605
}
580-
log.info(
581-
"Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].",
582-
currentActiveTaskCount,
583-
desiredActiveTaskCount,
584-
supervisorId,
585-
dataSource
586-
);
587-
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
588-
gracefulShutdownInternal();
589-
changeTaskCountInAutoScalerConfig(desiredActiveTaskCount);
590-
clearAllocationInfo();
591-
emitter.emit(ServiceMetricEvent.builder()
592-
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
593-
.setDimension(DruidMetrics.DATASOURCE, dataSource)
594-
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream())
595-
.setDimensionIfNotNull(
596-
DruidMetrics.TAGS,
597-
spec.getContextValue(DruidMetrics.TAGS)
598-
)
599-
.setMetric(
600-
AUTOSCALER_SCALING_TIME_METRIC,
601-
scaleActionStopwatch.millisElapsed()
602-
));
603-
log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource);
604-
return true;
605606
}
606607

607-
private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount)
608+
private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
608609
{
609-
// Sanity check.
610-
if (autoScalerConfig == null) {
611-
log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?");
612-
return;
613-
}
614610
ioConfig.setTaskCount(desiredActiveTaskCount);
615611
try {
616612
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,9 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()
192192
@Override
193193
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
194194
{
195-
AutoScalerConfig autoScalerCfg = ingestionSchema.getIOConfig().getAutoScalerConfig();
196-
if (autoScalerCfg != null && autoScalerCfg.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
197-
return autoScalerCfg.createAutoScaler(supervisor, this, emitter);
195+
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
196+
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
197+
return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
198198
}
199199
return new NoopTaskAutoScaler();
200200
}

0 commit comments

Comments
 (0)