Skip to content

Commit e2aa55a

Browse files
committed
Finish the integration test; introduce idle metrics
1 parent 4a18380 commit e2aa55a

File tree

10 files changed

+107
-153
lines changed

10 files changed

+107
-153
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java

Lines changed: 23 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
2727
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
2828
import org.apache.druid.java.util.common.StringUtils;
29-
import org.apache.druid.query.DruidMetrics;
3029
import org.apache.druid.testing.embedded.EmbeddedBroker;
3130
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
3231
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
@@ -35,15 +34,13 @@
3534
import org.apache.druid.testing.embedded.EmbeddedIndexer;
3635
import org.apache.druid.testing.embedded.EmbeddedOverlord;
3736
import org.apache.druid.testing.embedded.EmbeddedRouter;
38-
import org.apache.druid.testing.embedded.emitter.LatchableEmitterModule;
3937
import org.apache.druid.testing.embedded.indexing.MoreResources;
4038
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
4139
import org.apache.kafka.clients.producer.ProducerRecord;
4240
import org.hamcrest.Matchers;
4341
import org.joda.time.DateTime;
4442
import org.joda.time.DateTimeZone;
4543
import org.junit.jupiter.api.Assertions;
46-
import org.junit.jupiter.api.Disabled;
4744
import org.junit.jupiter.api.Test;
4845
import org.junit.jupiter.api.Timeout;
4946

@@ -52,21 +49,17 @@
5249
import java.util.stream.Collectors;
5350
import java.util.stream.IntStream;
5451

55-
import static org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC;
56-
5752
/**
5853
* Integration test for {@link org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler}.
5954
* <p>
60-
* Tests the autoscaler's ability to compute optimal task counts based
61-
* on partition count and cost metrics (lag and idle time).
55+
* Tests the autoscaler's ability to compute optimal task counts based on partition count and cost metrics (lag and idle time).
6256
*/
6357
public class CostBasedAutoScalerIntegrationTest extends EmbeddedClusterTestBase
6458
{
6559
private static final String TOPIC = EmbeddedClusterApis.createTestDatasourceName();
6660
private static final String EVENT_TEMPLATE = "{\"timestamp\":\"%s\",\"dimension\":\"value%d\",\"metric\":%d}";
67-
;
68-
private static final int PARTITION_COUNT = 100;
69-
private static final int INITIAL_TASK_COUNT = 25;
61+
private static final int PARTITION_COUNT = 10;
62+
private static final int INITIAL_TASK_COUNT = 10;
7063

7164
private final EmbeddedBroker broker = new EmbeddedBroker();
7265
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
@@ -87,6 +80,7 @@ public void start()
8780
{
8881
super.start();
8982
createTopicWithPartitions(TOPIC, PARTITION_COUNT);
83+
produceRecordsToKafka(500);
9084
}
9185

9286
@Override
@@ -107,31 +101,27 @@ public void stop()
107101

108102
coordinator.addProperty("druid.manager.segments.useIncrementalCache", "ifSynced");
109103

110-
cluster.addExtension(KafkaIndexTaskModule.class)
111-
.addExtension(LatchableEmitterModule.class)
112-
.useDefaultTimeoutForLatchableEmitter(300)
113-
.addCommonProperty("druid.emitter", "latching")
114-
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
115-
.addResource(kafkaServer)
104+
cluster.useLatchableEmitter()
116105
.addServer(coordinator)
117106
.addServer(overlord)
118107
.addServer(indexer)
119108
.addServer(broker)
120109
.addServer(historical)
110+
.addExtension(KafkaIndexTaskModule.class)
111+
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.5s")
112+
.addResource(kafkaServer)
121113
.addServer(new EmbeddedRouter());
122114

123115
return cluster;
124116
}
125117

126-
@Disabled
127118
@Test
128119
@Timeout(45)
129120
public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
130121
{
131-
final String supervisorId = dataSource + "_supe";
122+
final String superId = dataSource + "_super";
132123

133-
// Produce some amount of data to kafka, to trigger a 'scale down' decision to 17 tasks.
134-
produceRecordsToKafka(50);
124+
// Produce some amount of data to kafka, to trigger a 'scale down' decision to 4 tasks.
135125

136126
final CostBasedAutoScalerConfig autoScalerConfig = CostBasedAutoScalerConfig
137127
.builder()
@@ -140,35 +130,27 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
140130
.taskCountMax(100)
141131
.taskCountStart(INITIAL_TASK_COUNT)
142132
.metricsCollectionIntervalMillis(3000)
143-
.metricsCollectionRangeMillis(2000)
144-
.scaleActionStartDelayMillis(3000)
145-
.scaleActionPeriodMillis(2000)
146-
.minTriggerScaleActionFrequencyMillis(3000)
133+
.scaleActionStartDelayMillis(5000)
134+
.scaleActionPeriodMillis(5000)
135+
.minTriggerScaleActionFrequencyMillis(5000)
147136
// Weight configuration: strongly favor lag reduction over idle time
148137
.lagWeight(0.9)
149138
.idleWeight(0.1)
150139
.build();
151140

152-
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler(
153-
supervisorId,
154-
autoScalerConfig
155-
);
141+
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler(superId, autoScalerConfig);
156142

157143
// Submit the supervisor
158-
Assertions.assertEquals(
159-
supervisorId,
160-
cluster.callApi().postSupervisor(kafkaSupervisorSpec)
161-
);
144+
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(kafkaSupervisorSpec));
162145

163146
// Wait for the supervisor to be healthy and running
164-
waitForSupervisorRunning(supervisorId);
147+
waitForSupervisorRunning(superId);
165148

166-
// Wait for autoscaler to emit optimalTaskCount metric indicating scale-up
167-
// We expect the optimal task count to be either 34 or 50.
149+
// Wait for autoscaler to emit optimalTaskCount metric indicating scale-down
150+
// We expect the optimal task count to 2
168151
overlord.latchableEmitter().waitForEvent(
169-
event -> event.hasMetricName(AUTOSCALER_REQUIRED_TASKS_METRIC)
170-
.hasDimension(DruidMetrics.DATASOURCE, dataSource)
171-
.hasValueMatching(Matchers.equalTo(17L))
152+
event -> event.hasMetricName("task/autoScaler/costBased/optimalTaskCount")
153+
.hasValueMatching(Matchers.equalTo(4L))
172154
);
173155

174156
// Suspend the supervisor
@@ -177,7 +159,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
177159

178160
private void waitForSupervisorRunning(String supervisorId)
179161
{
180-
int maxAttempts = 120;
162+
int maxAttempts = 10;
181163
int attempt = 0;
182164
while (attempt < maxAttempts) {
183165
SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisorId);
@@ -186,7 +168,7 @@ private void waitForSupervisorRunning(String supervisorId)
186168
}
187169
attempt++;
188170
try {
189-
Thread.sleep(3000);
171+
Thread.sleep(1000);
190172
}
191173
catch (InterruptedException e) {
192174
Thread.currentThread().interrupt();
@@ -222,7 +204,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
222204
return MoreResources.Supervisor.KAFKA_JSON
223205
.get()
224206
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)))
225-
.withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(1000))
207+
.withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(100))
226208
.withIoConfig(
227209
ioConfig -> ioConfig
228210
.withConsumerProperties(kafkaServer.consumerProperties())

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.indexing.kafka;
2121

22+
import com.google.common.util.concurrent.AtomicDouble;
2223
import org.apache.druid.error.DruidException;
2324
import org.apache.druid.java.util.common.logger.Logger;
2425
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -47,6 +48,8 @@ public class KafkaConsumerMonitor extends AbstractMonitor
4748
private static final String PARTITION_TAG = "partition";
4849
private static final String NODE_ID_TAG = "node-id";
4950

51+
private static final String POLL_IDLE_RATION_METRIC_NAME = "poll-idle-ratio-avg";
52+
5053
/**
5154
* Kafka metric name -> Kafka metric descriptor. Taken from
5255
* https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
@@ -129,6 +132,7 @@ public class KafkaConsumerMonitor extends AbstractMonitor
129132

130133
private final KafkaConsumer<?, ?> consumer;
131134
private final Map<MetricName, AtomicLong> counters = new HashMap<>();
135+
private final AtomicDouble pollIdleRatioAvg = new AtomicDouble(1.0d);
132136

133137
public KafkaConsumerMonitor(final KafkaConsumer<?, ?> consumer)
134138
{
@@ -172,6 +176,13 @@ public boolean doMonitor(final ServiceEmitter emitter)
172176
emitter.emit(builder.setMetric(kafkaConsumerMetric.getDruidMetricName(), emitValue));
173177
}
174178
}
179+
180+
// Capture `poll-idle-ratio-avg` metric for autoscaler purposes.
181+
if (POLL_IDLE_RATION_METRIC_NAME.equals(metricName.name())) {
182+
if (entry.getValue().metricValue() != null) {
183+
pollIdleRatioAvg.set(((Number) entry.getValue().metricValue()).doubleValue());
184+
}
185+
}
175186
}
176187

177188
return !stopAfterNext;
@@ -181,4 +192,10 @@ public void stopAfterNextEmit()
181192
{
182193
stopAfterNext = true;
183194
}
195+
196+
// Use that method in the future as metrics forwarder to supervisor
197+
public double getPollIdleRatioAvg()
198+
{
199+
return pollIdleRatioAvg.get();
200+
}
184201
}

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,7 @@ public Set<StreamPartition<KafkaTopicPartition>> getAssignment()
160160
.stream()
161161
.map(e -> new StreamPartition<>(
162162
stream,
163-
new KafkaTopicPartition(multiTopic, e.topic(),
164-
e.partition()
165-
)
163+
new KafkaTopicPartition(multiTopic, e.topic(), e.partition())
166164
))
167165
.collect(Collectors.toSet()));
168166
}
@@ -223,17 +221,23 @@ public Long getPosition(StreamPartition<KafkaTopicPartition> partition)
223221
public Map<KafkaTopicPartition, Long> getLatestSequenceNumbers(Set<StreamPartition<KafkaTopicPartition>> partitions)
224222
{
225223
return wrapExceptions(() -> CollectionUtils.mapKeys(
226-
consumer.endOffsets(
227-
partitions
228-
.stream()
229-
.map(e -> e.getPartitionId().asTopicPartition(e.getStream()))
230-
.collect(Collectors.toList()
231-
)
232-
),
233-
p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())
224+
consumer.endOffsets(
225+
partitions
226+
.stream()
227+
.map(e -> e.getPartitionId().asTopicPartition(e.getStream()))
228+
.collect(Collectors.toList()
229+
)
230+
),
231+
p -> new KafkaTopicPartition(multiTopic, p.topic(), p.partition())
234232
));
235233
}
236234

235+
@Override
236+
public double getPollIdleRatioMetric()
237+
{
238+
return monitor.getPollIdleRatioAvg();
239+
}
240+
237241
@Override
238242
public Set<KafkaTopicPartition> getPartitionIds(String stream)
239243
{
@@ -250,16 +254,20 @@ public Set<KafkaTopicPartition> getPartitionIds(String stream)
250254
if (allPartitions.isEmpty()) {
251255
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
252256
.ofCategory(DruidException.Category.INVALID_INPUT)
253-
.build("No partitions found for topics that match given pattern [%s]."
254-
+ "Check that the pattern regex is correct and matching topics exists", stream);
257+
.build(
258+
"No partitions found for topics that match given pattern [%s]."
259+
+ "Check that the pattern regex is correct and matching topics exists", stream
260+
);
255261
}
256262
} else {
257263
allPartitions = consumer.partitionsFor(stream);
258264
if (allPartitions == null) {
259265
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
260266
.ofCategory(DruidException.Category.INVALID_INPUT)
261-
.build("Topic [%s] is not found."
262-
+ " Check that the topic exists in Kafka cluster", stream);
267+
.build(
268+
"Topic [%s] is not found."
269+
+ " Check that the topic exists in Kafka cluster", stream
270+
);
263271
}
264272
}
265273
return allPartitions.stream()
@@ -316,7 +324,10 @@ public static void addConsumerPropertiesFromConfig(
316324
// Additional DynamicConfigProvider based extensible support for all consumer properties
317325
Object dynamicConfigProviderJson = consumerProperties.get(KafkaSupervisorIOConfig.DRUID_DYNAMIC_CONFIG_PROVIDER_KEY);
318326
if (dynamicConfigProviderJson != null) {
319-
DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(dynamicConfigProviderJson, DynamicConfigProvider.class);
327+
DynamicConfigProvider dynamicConfigProvider = configMapper.convertValue(
328+
dynamicConfigProviderJson,
329+
DynamicConfigProvider.class
330+
);
320331
Map<String, String> dynamicConfig = dynamicConfigProvider.getConfig();
321332
for (Map.Entry<String, String> e : dynamicConfig.entrySet()) {
322333
properties.setProperty(e.getKey(), e.getValue());
@@ -344,7 +355,8 @@ private static Deserializer getKafkaDeserializer(Properties properties, String k
344355
deserializerReturnType.getTypeName());
345356
}
346357
}
347-
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
358+
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
359+
InvocationTargetException e) {
348360
throw new StreamException(e);
349361
}
350362

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
128128
* returns the set of partitions under the given stream
129129
*
130130
* @param stream name of stream
131-
*
132131
* @return set of partitions
133132
*/
134133
Set<PartitionIdType> getPartitionIds(String stream);
@@ -143,6 +142,15 @@ default Map<PartitionIdType, SequenceOffsetType> getLatestSequenceNumbers(Set<St
143142
throw new UnsupportedOperationException();
144143
}
145144

145+
/**
146+
* @return Kafka's `poll-idle-ratio-avg` an it's analog for Kinesis,
147+
* required for correct autoscaler work
148+
*/
149+
default double getPollIdleRatioMetric()
150+
{
151+
throw new UnsupportedOperationException();
152+
}
153+
146154
/**
147155
* close the RecordSupplier
148156
*/

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4371,6 +4371,11 @@ public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> getPartitionOffset
43714371
return partitionOffsets;
43724372
}
43734373

4374+
public double getPollIdleRatioMetric()
4375+
{
4376+
return recordSupplier.getPollIdleRatioMetric();
4377+
}
4378+
43744379
/**
43754380
* Should never be called outside of tests.
43764381
*/

0 commit comments

Comments
 (0)