Skip to content

Commit 276b98a

Browse files
committed
Extract stats via /rowStats call, remove /metrics call
1 parent 1e5281b commit 276b98a

File tree

10 files changed

+471
-243
lines changed

10 files changed

+471
-243
lines changed

COST_BASED_AUTOSCALER_ANALYSIS.md

Lines changed: 462 additions & 0 deletions
Large diffs are not rendered by default.

embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.joda.time.DateTime;
4343
import org.joda.time.DateTimeZone;
4444
import org.junit.jupiter.api.Assertions;
45-
import org.junit.jupiter.api.Disabled;
4645
import org.junit.jupiter.api.Test;
4746
import org.junit.jupiter.api.Timeout;
4847

@@ -160,7 +159,7 @@ public void test_autoScaler_computesOptimalTaskCountAndProduceScaleDown()
160159
}
161160

162161
@Test
163-
@Disabled // Actually make it work, it does not ingest poverfully enough
162+
// @Disabled // Actually make it work, it does not ingest poverfully enough
164163
@Timeout(60)
165164
public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
166165
{
@@ -182,14 +181,14 @@ public void test_autoScaler_computesOptimalTaskCountAndProducesScaleUp()
182181
.taskCountMin(1)
183182
.taskCountMax(50)
184183
.taskCountStart(lowInitialTaskCount)
185-
.metricsCollectionIntervalMillis(200)
184+
.metricsCollectionIntervalMillis(500)
186185
.scaleActionStartDelayMillis(500)
187186
.scaleActionPeriodMillis(500)
188187
.minTriggerScaleActionFrequencyMillis(1000)
189188
// Weight configuration: favor lag as the primary signal for scale-up scenarios
190189
// High lag with low idle (overloaded) will trigger scale-up
191-
.lagWeight(0.7)
192-
.idleWeight(0.3)
190+
.lagWeight(0.3)
191+
.idleWeight(0.7)
193192
.build();
194193

195194
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisorWithAutoScaler(

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4403,48 +4403,6 @@ public void testGetCurrentTotalStats()
44034403
Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("prop2", "val2")), stats.get("1"));
44044404
}
44054405

4406-
@Test
4407-
public void testGetTaskMetrics()
4408-
{
4409-
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null, false, kafkaHost);
4410-
supervisor.addTaskGroupToActivelyReadingTaskGroup(
4411-
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, topic, 0)),
4412-
singlePartitionMap(topic, 0, 0L),
4413-
null,
4414-
null,
4415-
ImmutableSet.of("task1"),
4416-
ImmutableSet.of()
4417-
);
4418-
4419-
supervisor.addTaskGroupToPendingCompletionTaskGroup(
4420-
supervisor.getTaskGroupIdForPartition(new KafkaTopicPartition(false, topic, 1)),
4421-
singlePartitionMap(topic, 0, 0L),
4422-
null,
4423-
null,
4424-
ImmutableSet.of("task2"),
4425-
ImmutableSet.of()
4426-
);
4427-
4428-
EasyMock.expect(taskClient.getMetrics("task1"))
4429-
.andReturn(Futures.immediateFuture(ImmutableMap.of("pollIdleRatio", 0.5)))
4430-
.times(1);
4431-
4432-
EasyMock.expect(taskClient.getMetrics("task2"))
4433-
.andReturn(Futures.immediateFuture(ImmutableMap.of("pollIdleRatio", 0.8)))
4434-
.times(1);
4435-
4436-
replayAll();
4437-
4438-
Map<String, Map<String, Object>> metrics = supervisor.getTaskMetrics();
4439-
4440-
verifyAll();
4441-
4442-
Assert.assertEquals(2, metrics.size());
4443-
Assert.assertEquals(ImmutableSet.of("0", "1"), metrics.keySet());
4444-
Assert.assertEquals(ImmutableMap.of("task1", ImmutableMap.of("pollIdleRatio", 0.5)), metrics.get("0"));
4445-
Assert.assertEquals(ImmutableMap.of("task2", ImmutableMap.of("pollIdleRatio", 0.8)), metrics.get("1"));
4446-
}
4447-
44484406
@Test
44494407
public void testGetCurrentParseErrors()
44504408
{

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,6 @@ ListenableFuture<TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>>> get
8080
*/
8181
ListenableFuture<DateTime> getStartTimeAsync(String id);
8282

83-
/**
84-
* Gets all available metrics for a task.
85-
*
86-
* @param id task id
87-
*/
88-
ListenableFuture<Map<String, Object>> getMetrics(String id);
89-
9083
/**
9184
* Pause a task.
9285
*

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImpl.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -257,22 +257,6 @@ public ListenableFuture<DateTime> getStartTimeAsync(String id)
257257
.go();
258258
}
259259

260-
@Override
261-
public ListenableFuture<Map<String, Object>> getMetrics(String id)
262-
{
263-
return makeRequest(id, new RequestBuilder(HttpMethod.GET, "/metrics"))
264-
.handler(new BytesFullResponseHandler())
265-
.onSuccess(r -> {
266-
if (isNullOrEmpty(r.getContent())) {
267-
return null;
268-
} else {
269-
return JacksonUtils.readValue(jsonMapper, r.getContent(), JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT);
270-
}
271-
})
272-
.onNotAvailable(e -> Either.value(null))
273-
.go();
274-
}
275-
276260
@Override
277261
public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> pauseAsync(String id)
278262
{

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 3 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,6 @@ public void onFailure(Throwable t)
957957
}
958958
finally {
959959
try {
960-
this.recordSupplier = null;
961960
if (driver != null) {
962961
driver.close();
963962
}
@@ -1665,6 +1664,9 @@ public Map<String, Object> doGetRowStats()
16651664

16661665
returnMap.put("movingAverages", averagesMap);
16671666
returnMap.put("totals", totalsMap);
1667+
if (this.recordSupplier != null) {
1668+
returnMap.put("pollIdleRatio", this.recordSupplier.getPollIdleRatioMetric());
1669+
}
16681670
return returnMap;
16691671
}
16701672

@@ -1989,39 +1991,6 @@ public DateTime getStartTime(@Context final HttpServletRequest req)
19891991
return startTime;
19901992
}
19911993

1992-
/**
1993-
* Returns the poll-idle-ratio-avg metric from the underlying record supplier (e.g., Kafka consumer).
1994-
* This metric indicates how much time the consumer spends idle vs polling:
1995-
* <ul>
1996-
* <li>0 means the consumer is never idle (always consuming)</li>
1997-
* <li>1 means the consumer is always idle (not receiving data)</li>
1998-
* </ul>
1999-
*
2000-
* @return HTTP 200 with JSON containing pollIdleRatio, HTTP 503 if supplier not initialized,
2001-
* or HTTP 501 if the stream type doesn't support this metric
2002-
*/
2003-
@GET
2004-
@Path("/metrics")
2005-
@Produces(MediaType.APPLICATION_JSON)
2006-
public Response getMetrics(@Context final HttpServletRequest req)
2007-
{
2008-
authorizationCheck(req);
2009-
final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> supplier = recordSupplier;
2010-
if (supplier == null) {
2011-
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
2012-
.entity(ImmutableMap.of("error", "Record supplier isn't initialized"))
2013-
.build();
2014-
}
2015-
try {
2016-
return Response.ok(ImmutableMap.of("pollIdleRatio", supplier.getPollIdleRatioMetric())).build();
2017-
}
2018-
catch (UnsupportedOperationException e) {
2019-
return Response.status(501)
2020-
.entity(ImmutableMap.of("error", "Poll idle ratio metric isn't supported by this stream type"))
2021-
.build();
2022-
}
2023-
}
2024-
20251994
@VisibleForTesting
20261995
public long getNextCheckpointTime()
20271996
{

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java

Lines changed: 1 addition & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,22 +1481,6 @@ public Map<String, Map<String, Object>> getStats()
14811481
}
14821482
}
14831483

1484-
@Override
1485-
public Map<String, Map<String, Object>> getTaskMetrics()
1486-
{
1487-
try {
1488-
return getCurrentTaskMetrics();
1489-
}
1490-
catch (InterruptedException ie) {
1491-
Thread.currentThread().interrupt();
1492-
log.warn(ie, "getTaskMetrics() interrupted.");
1493-
throw new RuntimeException(ie);
1494-
}
1495-
catch (ExecutionException ee) {
1496-
throw new RuntimeException(ee);
1497-
}
1498-
}
1499-
15001484
@Override
15011485
public List<ParseExceptionReport> getParseErrors()
15021486
{
@@ -1592,83 +1576,6 @@ private Map<String, Map<String, Object>> getCurrentTotalStats()
15921576
return allStats;
15931577
}
15941578

1595-
/**
1596-
* Collect metrics from all tasks managed by this supervisor.
1597-
* Note that {@code StatsFromTaskResult} are reused for metrics collection since
1598-
* both stats and metrics are represented as {@code Map<String, Object>}.
1599-
*
1600-
* @return A map of groupId->taskId->task metrics
1601-
* @throws InterruptedException
1602-
* @throws ExecutionException
1603-
*/
1604-
private Map<String, Map<String, Object>> getCurrentTaskMetrics()
1605-
throws InterruptedException, ExecutionException
1606-
{
1607-
Map<String, Map<String, Object>> allMetrics = new HashMap<>();
1608-
final List<ListenableFuture<StatsFromTaskResult>> futures = new ArrayList<>();
1609-
final List<Pair<Integer, String>> groupAndTaskIds = new ArrayList<>();
1610-
1611-
for (int groupId : activelyReadingTaskGroups.keySet()) {
1612-
TaskGroup group = activelyReadingTaskGroups.get(groupId);
1613-
for (String taskId : group.taskIds()) {
1614-
futures.add(
1615-
Futures.transform(
1616-
taskClient.getMetrics(taskId),
1617-
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult(
1618-
groupId,
1619-
taskId,
1620-
currentStats
1621-
),
1622-
MoreExecutors.directExecutor()
1623-
)
1624-
);
1625-
groupAndTaskIds.add(new Pair<>(groupId, taskId));
1626-
}
1627-
}
1628-
1629-
for (int groupId : pendingCompletionTaskGroups.keySet()) {
1630-
List<TaskGroup> pendingGroups = pendingCompletionTaskGroups.get(groupId);
1631-
for (TaskGroup pendingGroup : pendingGroups) {
1632-
for (String taskId : pendingGroup.taskIds()) {
1633-
futures.add(
1634-
Futures.transform(
1635-
taskClient.getMetrics(taskId),
1636-
(Function<Map<String, Object>, StatsFromTaskResult>) (currentStats) -> new StatsFromTaskResult(
1637-
groupId,
1638-
taskId,
1639-
currentStats
1640-
),
1641-
MoreExecutors.directExecutor()
1642-
)
1643-
);
1644-
groupAndTaskIds.add(new Pair<>(groupId, taskId));
1645-
}
1646-
}
1647-
}
1648-
1649-
List<Either<Throwable, StatsFromTaskResult>> results = coalesceAndAwait(futures);
1650-
for (int i = 0; i < results.size(); i++) {
1651-
if (results.get(i).isValue()) {
1652-
StatsFromTaskResult result = results.get(i).valueOrThrow();
1653-
1654-
if (result != null) {
1655-
Map<String, Object> groupMap = allMetrics.computeIfAbsent(result.getGroupId(), k -> new HashMap<>());
1656-
groupMap.put(result.getTaskId(), result.getStats());
1657-
}
1658-
} else {
1659-
Pair<Integer, String> groupAndTaskId = groupAndTaskIds.get(i);
1660-
log.noStackTrace().warn(
1661-
results.get(i).error(),
1662-
"Failed to get metrics for group[%d]-task[%s]",
1663-
groupAndTaskId.lhs,
1664-
groupAndTaskId.rhs
1665-
);
1666-
}
1667-
}
1668-
1669-
return allMetrics;
1670-
}
1671-
16721579
/**
16731580
* Collect parse errors from all tasks managed by this supervisor.
16741581
*
@@ -4472,7 +4379,7 @@ public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> getPartitionOffset
44724379
*/
44734380
public double getPollIdleRatioMetric()
44744381
{
4475-
Map<String, Map<String, Object>> taskMetrics = getTaskMetrics();
4382+
Map<String, Map<String, Object>> taskMetrics = getStats();
44764383
if (taskMetrics.isEmpty()) {
44774384
return 1.;
44784385
}

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ private void collectMetrics()
169169
* Returns -1 (no scaling needed) in the following cases:
170170
* <ul>
171171
* <li>Metrics are not available</li>
172-
* <li>Current idle ratio is in the ideal range [0.2, 0.6] - optimal utilization achieved</li>
172+
* <li>The current idle ratio is in the ideal range [0.2, 0.6] - optimal utilization achieved</li>
173173
* <li>Optimal task count equals current task count</li>
174174
* </ul>
175175
*

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClientAsyncImplTest.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -342,45 +342,6 @@ public void test_getStartTimeAsync_notAvailable() throws Exception
342342
Assert.assertNull(client.getStartTimeAsync(TASK_ID).get());
343343
}
344344

345-
@Test
346-
public void test_getMetricsAsync() throws Exception
347-
{
348-
final Map<String, Object> retVal = ImmutableMap.of("pollIdleRatio", 0.5);
349-
350-
serviceClient.expectAndRespond(
351-
new RequestBuilder(HttpMethod.GET, "/metrics").timeout(httpTimeout),
352-
HttpResponseStatus.OK,
353-
Collections.emptyMap(),
354-
jsonMapper.writeValueAsBytes(retVal)
355-
);
356-
357-
Assert.assertEquals(retVal, client.getMetrics(TASK_ID).get());
358-
}
359-
360-
@Test
361-
public void test_getMetricsAsync_empty() throws Exception
362-
{
363-
serviceClient.expectAndRespond(
364-
new RequestBuilder(HttpMethod.GET, "/metrics").timeout(httpTimeout),
365-
HttpResponseStatus.OK,
366-
Collections.emptyMap(),
367-
ByteArrays.EMPTY_ARRAY
368-
);
369-
370-
Assert.assertNull(client.getMetrics(TASK_ID).get());
371-
}
372-
373-
@Test
374-
public void test_getMetricsAsync_notAvailable() throws Exception
375-
{
376-
serviceClient.expectAndThrow(
377-
new RequestBuilder(HttpMethod.GET, "/metrics").timeout(httpTimeout),
378-
new ServiceNotAvailableException(TASK_ID)
379-
);
380-
381-
Assert.assertNull(client.getMetrics(TASK_ID).get());
382-
}
383-
384345
@Test
385346
public void test_pauseAsync_immediateOk() throws Exception
386347
{

server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ default Map<String, Map<String, Object>> getStats()
7171
return ImmutableMap.of();
7272
}
7373

74-
default Map<String, Map<String, Object>> getTaskMetrics()
75-
{
76-
return ImmutableMap.of();
77-
}
78-
7974
default List<ParseExceptionReport> getParseErrors()
8075
{
8176
return ImmutableList.of();

0 commit comments

Comments
 (0)