Skip to content

Commit 3cda48a

Browse files
committed
Address review comments - 3
1 parent 9ecafee commit 3cda48a

File tree

19 files changed

+119
-191
lines changed

19 files changed

+119
-191
lines changed

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java

Lines changed: 5 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
2424
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
2525
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
26-
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
2726
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler;
2827
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScalerConfig;
2928
import org.apache.druid.java.util.common.StringUtils;
@@ -42,6 +41,7 @@
4241
import org.hamcrest.Matchers;
4342
import org.joda.time.DateTime;
4443
import org.joda.time.DateTimeZone;
44+
import org.joda.time.Seconds;
4545
import org.junit.jupiter.api.Assertions;
4646
import org.junit.jupiter.api.Test;
4747
import org.junit.jupiter.api.Timeout;
@@ -134,8 +134,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
134134
.taskCountMin(1)
135135
.taskCountMax(100)
136136
.taskCountStart(initialTaskCount)
137-
.metricsCollectionIntervalMillis(1000)
138-
.scaleActionStartDelayMillis(1500)
139137
.scaleActionPeriodMillis(1500)
140138
.minTriggerScaleActionFrequencyMillis(3000)
141139
// Weight configuration: strongly favor lag reduction over idle time
@@ -149,21 +147,20 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
149147
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(spec));
150148

151149
// Wait for the supervisor to be healthy and running
152-
waitForSupervisorRunning(superId);
150+
overlord.latchableEmitter().waitForEvent(event -> event.hasMetricName("task/run/time"));
153151

154152
// Wait for autoscaler to emit optimalTaskCount metric indicating scale-down
155153
// We expect the optimal task count to 4
156154
overlord.latchableEmitter().waitForEvent(
157155
event -> event.hasMetricName(OPTIMAL_TASK_COUNT_METRIC)
158-
.hasValueMatching(Matchers.equalTo(4L))
156+
.hasValueMatching(Matchers.equalTo(6L))
159157
);
160158

161159
// Suspend the supervisor
162160
cluster.callApi().postSupervisor(spec.createSuspendedSpec());
163161
}
164162

165163
@Test
166-
@Timeout(60)
167164
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
168165
{
169166
final String superId = dataSource + "_super_scaleup";
@@ -185,8 +182,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
185182
.taskCountMin(1)
186183
.taskCountMax(50)
187184
.taskCountStart(lowInitialTaskCount)
188-
.metricsCollectionIntervalMillis(500)
189-
.scaleActionStartDelayMillis(500)
190185
.scaleActionPeriodMillis(500)
191186
.minTriggerScaleActionFrequencyMillis(1000)
192187
// Weight configuration: favor lag as the primary signal for scale-up scenarios
@@ -205,7 +200,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
205200
Assertions.assertEquals(superId, cluster.callApi().postSupervisor(kafkaSupervisorSpec));
206201

207202
// Wait for the supervisor to be healthy and running
208-
waitForSupervisorRunning(superId);
203+
overlord.latchableEmitter().waitForEvent(event -> event.hasMetricName("task/run/time"));
209204

210205
// First, wait for any optimalTaskCount metric to verify the autoscaler is running
211206
overlord.latchableEmitter().waitForEvent(
@@ -238,27 +233,6 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
238233
cluster.callApi().postSupervisor(kafkaSupervisorSpec.createSuspendedSpec());
239234
}
240235

241-
private void waitForSupervisorRunning(String supervisorId)
242-
{
243-
int maxAttempts = 10;
244-
int attempt = 0;
245-
while (attempt < maxAttempts) {
246-
SupervisorStatus status = cluster.callApi().getSupervisorStatus(supervisorId);
247-
if (status != null && "RUNNING".equals(status.getState()) && status.isHealthy()) {
248-
return;
249-
}
250-
attempt++;
251-
try {
252-
Thread.sleep(1000);
253-
}
254-
catch (InterruptedException e) {
255-
Thread.currentThread().interrupt();
256-
throw new RuntimeException(e);
257-
}
258-
}
259-
throw new AssertionError("Supervisor did not reach RUNNING state within timeout");
260-
}
261-
262236
private void produceRecordsToKafka(int recordCount, int iterations)
263237
{
264238
int recordCountPerSlice = recordCount / iterations;
@@ -302,6 +276,7 @@ private KafkaSupervisorSpec createKafkaSupervisorWithAutoScaler(
302276
ioConfig -> ioConfig
303277
.withConsumerProperties(kafkaServer.consumerProperties())
304278
.withTaskCount(taskCount)
279+
.withTaskDuration(Seconds.THREE.toPeriod())
305280
.withAutoScalerConfig(autoScalerConfig)
306281
)
307282
.withId(supervisorId)

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ public void testRunAfterDataInsertedLiveReport() throws Exception
624624
break;
625625
}
626626
}
627-
Map rowStats = runner.doGetRowStats();
627+
Map rowStats = runner.doGetRowStats(false);
628628
Map totals = (Map) rowStats.get("totals");
629629
RowIngestionMetersTotals buildSegments = (RowIngestionMetersTotals) totals.get("buildSegments");
630630

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4393,7 +4393,7 @@ public void testGetCurrentTotalStats()
43934393

43944394
replayAll();
43954395

4396-
Map<String, Map<String, Object>> stats = supervisor.getStats();
4396+
Map<String, Map<String, Object>> stats = supervisor.getStats(false);
43974397

43984398
verifyAll();
43994399

extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3762,7 +3762,7 @@ public void testGetCurrentTotalStats()
37623762

37633763
replayAll();
37643764

3765-
Map<String, Map<String, Object>> stats = supervisor.getStats();
3765+
Map<String, Map<String, Object>> stats = supervisor.getStats(false);
37663766

37673767
verifyAll();
37683768

indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public enum IngestionMode
6969
{
7070
REPLACE, // replace with tombstones
7171
APPEND, // append to existing segments
72-
REPLACE_LEGACY, // original 'replace', it does not replace existing data for empty time chunks in input intervals
72+
REPLACE_LEGACY, // original replace, it does not replace existing data for empty time chunks in input intervals
7373
HADOOP, // non-native batch, hadoop ingestion
7474
NONE; // not an ingestion task (i.e. a kill task)
7575

indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ public Optional<SupervisorReport> getSupervisorStatus(String id)
318318
public Optional<Map<String, Map<String, Object>>> getSupervisorStats(String id)
319319
{
320320
Pair<Supervisor, SupervisorSpec> supervisor = supervisors.get(id);
321-
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStats());
321+
return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStats(false));
322322
}
323323

324324
public Optional<List<ParseExceptionReport>> getSupervisorParseErrors(String id)

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,15 @@ ListenableFuture<Boolean> setEndOffsetsAsync(
134134
*/
135135
ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String id);
136136

137+
/**
138+
* Get streamer metrics for a task. Retries on failure.
139+
*
140+
* Task-side is {@link SeekableStreamIndexTaskRunner#getRowStats}.
141+
*
142+
* @param id task id
143+
*/
144+
ListenableFuture<Map<String, Object>> getStreamerMetrics(String id);
145+
137146
/**
138147
* Get parse errors for a task. Retries on failure.
139148
*

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,23 @@ public ListenableFuture<Map<String, Object>> getMovingAveragesAsync(String id)
318318
.go();
319319
}
320320

321+
@Override
322+
public ListenableFuture<Map<String, Object>> getStreamerMetrics(String id)
323+
{
324+
return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/rowStats?autoScalerStatsOnly=true"))
325+
.handler(new BytesFullResponseHandler())
326+
.onSuccess(r -> {
327+
if (isNullOrEmpty(r.getContent())) {
328+
log.warn("Got empty response when calling getStreamerMetrics, id[%s]", id);
329+
return null;
330+
} else {
331+
return JacksonUtils.readValue(jsonMapper, r.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
332+
}
333+
})
334+
.onNotAvailable(e -> Either.value(Collections.emptyMap()))
335+
.go();
336+
}
337+
321338
@Override
322339
public ListenableFuture<List<ParseExceptionReport>> getParseErrorsAsync(String id)
323340
{

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@
145145
public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOffsetType, RecordType extends ByteEntity>
146146
implements ChatHandler
147147
{
148+
public static final String AUTOSCALER_METRICS_KEY = "autoscalerMetrics";
149+
public static final String POLL_IDLE_RATIO_KEY = "pollIdleRatio";
150+
148151
private static final String CTX_KEY_LOOKUP_TIER = "lookupTier";
149152

150153
public enum Status
@@ -319,7 +322,7 @@ private Set<PartitionIdType> computeExclusiveStartPartitionsForSequence(
319322
/**
320323
* Returns the supervisorId for the task this runner is executing.
321324
* Backwards compatibility: if task spec from metadata has a null supervisorId field, falls back to dataSource
322-
*/
325+
*/
323326
public String getSupervisorId()
324327
{
325328
return task.getSupervisorId();
@@ -1210,7 +1213,6 @@ private Map<String, Object> getTaskCompletionRowStats()
12101213
return metrics;
12111214
}
12121215

1213-
12141216
private void maybePersistAndPublishSequences(Supplier<Committer> committerSupplier)
12151217
throws InterruptedException
12161218
{
@@ -1647,9 +1649,15 @@ public Response registerUpgradedPendingSegment(
16471649
}
16481650
}
16491651

1650-
public Map<String, Object> doGetRowStats()
1652+
public Map<String, Object> doGetRowStats(boolean autoScalerStatsOnly)
16511653
{
16521654
Map<String, Object> returnMap = new HashMap<>();
1655+
if (this.recordSupplier != null) {
1656+
returnMap.put(AUTOSCALER_METRICS_KEY, Map.of(POLL_IDLE_RATIO_KEY, this.recordSupplier.getPollIdleRatioMetric()));
1657+
}
1658+
if (autoScalerStatsOnly) {
1659+
return returnMap;
1660+
}
16531661
Map<String, Object> totalsMap = new HashMap<>();
16541662
Map<String, Object> averagesMap = new HashMap<>();
16551663

@@ -1664,9 +1672,6 @@ public Map<String, Object> doGetRowStats()
16641672

16651673
returnMap.put("movingAverages", averagesMap);
16661674
returnMap.put("totals", totalsMap);
1667-
if (this.recordSupplier != null) {
1668-
returnMap.put("pollIdleRatio", this.recordSupplier.getPollIdleRatioMetric());
1669-
}
16701675
return returnMap;
16711676
}
16721677

@@ -1678,7 +1683,7 @@ public Map<String, Object> doGetLiveReports()
16781683

16791684
payload.put("ingestionState", ingestionState);
16801685
payload.put("unparseableEvents", events);
1681-
payload.put("rowStats", doGetRowStats());
1686+
payload.put("rowStats", doGetRowStats(false));
16821687

16831688
ingestionStatsAndErrors.put("taskId", task.getId());
16841689
ingestionStatsAndErrors.put("payload", payload);
@@ -1691,11 +1696,12 @@ public Map<String, Object> doGetLiveReports()
16911696
@Path("/rowStats")
16921697
@Produces(MediaType.APPLICATION_JSON)
16931698
public Response getRowStats(
1694-
@Context final HttpServletRequest req
1699+
@Context final HttpServletRequest req,
1700+
@QueryParam("autoScalerStatsOnly") boolean autoScalerStatsOnly
16951701
)
16961702
{
16971703
authorizationCheck(req);
1698-
return Response.ok(doGetRowStats()).build();
1704+
return Response.ok(doGetRowStats(autoScalerStatsOnly)).build();
16991705
}
17001706

17011707
@GET

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,10 @@ default Map<PartitionIdType, SequenceOffsetType> getLatestSequenceNumbers(Set<St
143143
}
144144

145145
/**
146-
* @return Kafka's `poll-idle-ratio-avg` an it's analog for Kinesis,
147-
* required for correct autoscaler work
146+
* Average poll-to-idle ratio as reported by the stream consumer.
147+
* A value of 0 represents that the consumer is never idle, i.e. always consuming.
148+
* A value of 1 represents that the consumer is always idle, i.e. not receiving data.
149+
* Used by the supervisor auto-scaler to find an optimal task count that minimizes idle time.
148150
*/
149151
default double getPollIdleRatioMetric()
150152
{

0 commit comments

Comments
 (0)