Skip to content

Commit 313ba8e

Browse files
authored
Introduce cost-based tasks autoscaler for streaming ingestion (#18819)
Changes: - Capture poll-to-idle-ratio from Kafka consumers on the task side - Add `WeightedCostFunction` to calculate cost based on current ingestion lag and task idleness ratio - Add `CostBasedAutoScaler` that fetches metrics from tasks and computes cost to find optimal task count
1 parent ad8c65d commit 313ba8e

File tree

19 files changed

+2044
-13
lines changed

19 files changed

+2044
-13
lines changed
Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.indexing.autoscaler;
21+
22+
import org.apache.druid.data.input.impl.TimestampSpec;
23+
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
24+
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
25+
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
26+
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler;
27+
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
28+
import org.apache.druid.java.util.common.StringUtils;
29+
import org.apache.druid.query.DruidMetrics;
30+
import org.apache.druid.testing.embedded.EmbeddedBroker;
31+
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
32+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
33+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
34+
import org.apache.druid.testing.embedded.EmbeddedHistorical;
35+
import org.apache.druid.testing.embedded.EmbeddedIndexer;
36+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
37+
import org.apache.druid.testing.embedded.EmbeddedRouter;
38+
import org.apache.druid.testing.embedded.indexing.MoreResources;
39+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
40+
import org.apache.kafka.clients.producer.ProducerRecord;
41+
import org.hamcrest.Matchers;
42+
import org.joda.time.DateTime;
43+
import org.joda.time.DateTimeZone;
44+
import org.joda.time.Seconds;
45+
import org.junit.jupiter.api.Assertions;
46+
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.api.Timeout;
48+
49+
import java.nio.charset.StandardCharsets;
50+
import java.util.List;
51+
import java.util.concurrent.Executors;
52+
import java.util.stream.Collectors;
53+
import java.util.stream.IntStream;
54+
55+
import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.OPTIMAL_TASK_COUNT_METRIC;
56+
57+
/**
58+
* Integration test for {@link CostBasedAutoScaler}.
59+
* <p>
60+
* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time).
61+
*/
62+
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
63+
{
64+
private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName();
65+
private static final String EVENT_TEMPLATE = "{\"timestamp\":\"%s\",\"dimension\":\"value%d\",\"metric\":%d}";
66+
private static final int PARTITION_COUNT = 50;
67+
68+
private final EmbeddedBroker broker = new EmbeddedBroker();
69+
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
70+
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
71+
private final EmbeddedHistorical historical = new EmbeddedHistorical();
72+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
73+
private KafkaResource kafkaServer;
74+
75+
@Override
76+
public EmbeddedDruidCluster createCluster()
77+
{
78+
final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
79+
80+
kafkaServer = new KafkaResource()
81+
{
82+
@Override
83+
public void start()
84+
{
85+
super.start();
86+
createTopicWithPartitions(TOPIC, PARTITION_COUNT);
87+
produceRecordsToKafka(500, 1);
88+
}
89+
90+
@Override
91+
public void stop()
92+
{
93+
deleteTopic(TOPIC);
94+
super.stop();
95+
}
96+
};
97+
98+
// Increase worker capacity to handle more tasks
99+
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s")
100+
.addProperty("druid.worker.capacity", "60");
101+
102+
overlord.addProperty("druid.indexer.task.default.context", "{\"useConcurrentLocks\": true}")
103+
.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced")
104+
.addProperty("druid.manager.segments.pollDuration", "PT0.1s");
105+
106+
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
107+
108+
cluster.useLatchableEmitter()
109+
.useDefaultTimeoutForLatchableEmitter(120)
110+
.addServer(coordinator)
111+
.addServer(overlord)
112+
.addServer(indexer)
113+
.addServer(broker)
114+
.addServer(historical)
115+
.addExtension(KafkaIndexTaskModule.class)
116+
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.5s")
117+
.addResource(kafkaServer)
118+
.addServer(new EmbeddedRouter());
119+
120+
return cluster;
121+
}
122+
123+
@Test
124+
@Timeout(45)
125+
public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
126+
{
127+
final String superId = dataSource + "_super";
128+
final int initialTaskCount = 10;
129+
130+
final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
131+
.builder()
132+
.enableTaskAutoScaler(true)
133+
.taskCountMin(1)
134+
.taskCountMax(100)
135+
.taskCountStart(initialTaskCount)
136+
.scaleActionPeriodMillis(1500)
137+
.minTriggerScaleActionFrequencyMillis(3000)
138+
// Weight configuration: strongly favor lag reduction over idle time
139+
.lagWeight(0.9)
140+
.idleWeight(0.1)
141+
.build();
142+
143+
final KafkaSupervisorSpec spec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig, initialTaskCount);
144+
145+
// Submit the supervisor
146+
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
147+
148+
// Wait for the supervisor to be healthy and running
149+
overlord.latchableEmitter()
150+
.waitForEvent(event -> event.hasMetricName("task/run/time")
151+
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
152+
153+
// Wait for autoscaler to emit optimalTaskCount metric indicating scale-down
154+
// We expect the optimal task count to 4
155+
overlord.latchableEmitter().waitForEvent(
156+
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
157+
.hasValueMatching(Matchers.equalTo(6L))
158+
);
159+
160+
// Suspend the supervisor
161+
cluster.callApi().postSupervisor(spec.createSuspendedSpec());
162+
}
163+
164+
@Test
165+
@Timeout(125)
166+
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
167+
{
168+
final String superId = dataSource + "_super_scaleup";
169+
170+
// Start with a low task count (1 task for 50 partitions) and produce a large amount of data
171+
// to create lag pressure and low idle ratio, which should trigger a scale-up decision.
172+
// With the ideal idle range [0.2, 0.6], a single overloaded task will have idle < 0.2,
173+
// triggering the cost function to recommend more tasks.
174+
final int lowInitialTaskCount = 1;
175+
176+
// Produce additional records to create a backlog / lag
177+
// This ensures tasks are busy processing (low idle ratio)
178+
Executors.newSingleThreadExecutor().submit(() -> produceRecordsToKafka(500_000, 20));
179+
180+
// These values were carefully handpicked to allow that test to pass in a stable manner.
181+
final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
182+
.builder()
183+
.enableTaskAutoScaler(true)
184+
.taskCountMin(1)
185+
.taskCountMax(50)
186+
.taskCountStart(lowInitialTaskCount)
187+
.scaleActionPeriodMillis(500)
188+
.minTriggerScaleActionFrequencyMillis(1000)
189+
.lagWeight(0.2)
190+
.idleWeight(0.8)
191+
.build();
192+
193+
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler(
194+
superId,
195+
autoScalerConfig,
196+
lowInitialTaskCount
197+
);
198+
199+
// Submit the supervisor
200+
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(kafkaSupervisorSpec));
201+
202+
// Wait for the supervisor to be healthy and running
203+
overlord.latchableEmitter()
204+
.waitForEvent(event -> event.hasMetricName("task/run/time")
205+
.hasDimension(DruidMetrics.DATASOURCE, dataSource));
206+
207+
// With 50 partitions and high lag creating a low idle ratio (< 0.2),
208+
// the cost function must recommend scaling up to at least 2 tasks.
209+
overlord.latchableEmitter().waitForEvent(
210+
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
211+
.hasValueMatching(Matchers.greaterThan(1L))
212+
);
213+
214+
// Suspend the supervisor
215+
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
216+
}
217+
218+
private void produceRecordsToKafka(int recordCount, int iterations)
219+
{
220+
int recordCountPerSlice = recordCount / iterations;
221+
int counter = 0;
222+
for (int i = 0; i < iterations; i++) {
223+
DateTime timestamp = DateTime.now(DateTimeZone.UTC);
224+
List<ProducerRecord<byte[], byte[]>> records = IntStream
225+
.range(counter, counter + recordCountPerSlice)
226+
.mapToObj(k -> new ProducerRecord<byte[], byte[]>(
227+
TOPIC,
228+
k % PARTITION_COUNT,
229+
null,
230+
StringUtils.format(EVENT_TEMPLATE, timestamp, k, k)
231+
.getBytes(StandardCharsets.UTF_8)
232+
)
233+
)
234+
.collect(Collectors.toList());
235+
236+
kafkaServer.produceRecordsToTopic(records);
237+
try {
238+
Thread.sleep(100L);
239+
counter += recordCountPerSlice;
240+
}
241+
catch (InterruptedException e) {
242+
throw new RuntimeException(e);
243+
}
244+
}
245+
}
246+
247+
private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
248+
String supervisorId,
249+
CostBasedAutoScalerConfig autoScalerConfig,
250+
int taskCount
251+
)
252+
{
253+
return MoreResources.Supervisor.KAFKA_JSON
254+
.get()
255+
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)))
256+
.withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(100))
257+
.withIoConfig(
258+
ioConfig -> ioConfig
259+
.withConsumerProperties(kafkaServer.consumerProperties())
260+
.withTaskCount(taskCount)
261+
.withTaskDuration(Seconds.THREE.toPeriod())
262+
.withAutoScalerConfig(autoScalerConfig)
263+
)
264+
.withId(supervisorId)
265+
.build(dataSource, TOPIC);
266+
}
267+
}

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.indexing.kafka;
2121

22+
import com.google.common.util.concurrent.AtomicDouble;
2223
import org.apache.druid.error.DruidException;
2324
import org.apache.druid.java.util.common.logger.Logger;
2425
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -47,9 +48,11 @@ public class KafkaConsumerMonitor extends AbstractMonitor
4748
private static final String PARTITION_TAG = "partition";
4849
private static final String NODE_ID_TAG = "node-id";
4950

51+
private static final String POLL_IDLE_RATIO_METRIC_NAME = "poll-idle-ratio-avg";
52+
5053
/**
5154
* Kafka metric name -> Kafka metric descriptor. Taken from
52-
* https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
55+
* <a href="https://kafka.apache.org/documentation/#consumer_fetch_monitoring">Kafka documentation</a>.
5356
*/
5457
private static final Map<String, KafkaConsumerMetric> METRICS =
5558
Stream.of(
@@ -129,6 +132,7 @@ public class KafkaConsumerMonitor extends AbstractMonitor
129132

130133
private final KafkaConsumer<?, ?> consumer;
131134
private final Map<MetricName, AtomicLong> counters = new HashMap<>();
135+
private final AtomicDouble pollIdleRatioAvg = new AtomicDouble(1.0d);
132136

133137
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
134138
{
@@ -172,6 +176,13 @@ public boolean doMonitor(final ServiceEmitter emitter)
172176
emitter.emit(builder.setMetric(kafkaConsumerMetric.getDruidMetricName(), emitValue));
173177
}
174178
}
179+
180+
// Capture `poll-idle-ratio-avg` metric for autoscaler purposes.
181+
if (POLL_IDLE_RATIO_METRIC_NAME.equals(metricName.name())) {
182+
if (entry.getValue().metricValue() != null) {
183+
pollIdleRatioAvg.set(((Number) entry.getValue().metricValue()).doubleValue());
184+
}
185+
}
175186
}
176187

177188
return !stopAfterNext;
@@ -181,4 +192,14 @@ public void stopAfterNextEmit()
181192
{
182193
stopAfterNext = true;
183194
}
195+
196+
/**
197+
* Average poll-to-idle ratio as reported by the Kafka consumer.
198+
* A value of 0 represents that the consumer is never idle, i.e. always consuming.
199+
* A value of 1 represents that the consumer is always idle, i.e. not receiving data.
200+
*/
201+
public double getPollIdleRatioAvg()
202+
{
203+
return pollIdleRatioAvg.get();
204+
}
184205
}

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public class KafkaIndexTask extends SeekableStreamIndexTask<KafkaTopicPartition,
4949

5050
/**
5151
* Resources that a {@link KafkaIndexTask} is authorized to use. Includes
52-
* performing a read action on external resource of type
52+
* performing a read action on an external resource of type
5353
*/
5454
public static final Set<ResourceAction> INPUT_SOURCE_RESOURCES = Set.of(
5555
AuthorizationUtils.createExternalResourceReadAction(KafkaIndexTaskModule.SCHEME)

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,12 @@ public Map<KafkaTopicPartition, Long> getLatestSequenceNumbers(Set<StreamPartiti
234234
));
235235
}
236236

237+
@Override
238+
public double getPollIdleRatioMetric()
239+
{
240+
return monitor.getPollIdleRatioAvg();
241+
}
242+
237243
@Override
238244
public Set<KafkaTopicPartition> getPartitionIds(String stream)
239245
{

indexing-service/src/main/java/org/apache/druid/indexing/common/stats/DropwizardRowIngestionMeters.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929

3030
public class DropwizardRowIngestionMeters implements RowIngestionMeters
3131
{
32-
private static final String ONE_MINUTE_NAME = "1m";
33-
private static final String FIVE_MINUTE_NAME = "5m";
34-
private static final String FIFTEEN_MINUTE_NAME = "15m";
32+
public static final String ONE_MINUTE_NAME = "1m";
33+
public static final String FIVE_MINUTE_NAME = "5m";
34+
public static final String FIFTEEN_MINUTE_NAME = "15m";
3535

3636
private final Meter processed;
3737
private final Meter processedBytes;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public abstract class SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
7474
protected final TaskLockType lockTypeToUse;
7575
protected final String supervisorId;
7676

77-
// Lazily initialized, to avoid calling it on the overlord when tasks are instantiated.
77+
// Lazily initialized to avoid calling it on the overlord when tasks are instantiated.
7878
// See https://github.com/apache/druid/issues/7724 for issues that can cause.
7979
// By the way, lazily init is synchronized because the runner may be needed in multiple threads.
8080
private final Supplier<SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, ?>> runnerSupplier;

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private Map<PartitionIdType, SequenceOffsetType> deserializeOffsetsMap(final byt
373373

374374
/**
375375
* Helper for {@link #pauseAsync}.
376-
*
376+
* <p>
377377
* Calls {@link #getStatusAsync} in a loop until a task is paused, then calls {@link #getCurrentOffsetsAsync} to
378378
* get the post-pause offsets for the task.
379379
*/

0 commit comments

Comments
 (0)