Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ default Map<PartitionIdType, SequenceOffsetType> getLatestSequenceNumbers(Set<St
* Average poll-to-idle ratio as reported by the stream consumer.
* A value of 0 represents that the consumer is never idle, i.e. always consuming.
* A value of 1 represents that the consumer is always idle, i.e. not receiving data.
* A negative value indicates that no valid metric is available.
* Used by the supervisor auto-scaler to find an optimal task count that minimizes idle time.
*/
default double getPollIdleRatioMetric()
{
throw new UnsupportedOperationException();
return -1;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,12 +400,12 @@ static int computeExtraPPTIncrease(
* This metric indicates how much time the consumer spends idle waiting for data.
*
* @param taskStats the stats map from supervisor.getStats()
* @return the average poll-idle-ratio across all tasks, or 0 if no valid metrics are available
* @return the average poll-idle-ratio across all tasks, or -1 if no valid metrics are available
*/
static double extractPollIdleRatio(Map<String, Map<String, Object>> taskStats)
{
if (taskStats == null || taskStats.isEmpty()) {
return 0.;
return -1.;
}

double sum = 0;
Expand All @@ -425,7 +425,7 @@ static double extractPollIdleRatio(Map<String, Map<String, Object>> taskStats)
}
}

return count > 0 ? sum / count : 0.;
return count > 0 ? sum / count : -1.;
}

/**
Expand Down Expand Up @@ -543,7 +543,7 @@ private Either<String, Boolean> validateMetricsForScaling(CostMetrics metrics)
{
if (metrics == null) {
return Either.error("No metrics collected");
} else if (metrics.getAvgProcessingRate() < 0 || metrics.getPollIdleRatio() < 0) {
} else if (metrics.getAvgProcessingRate() < 0) {
return Either.error("Task metrics not available");
} else if (metrics.getCurrentTaskCount() <= 0 || metrics.getPartitionCount() <= 0) {
return Either.error("Supervisor metrics not available");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -179,6 +181,14 @@ public void testComputeOptimalTaskCount()
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, 10, -5, 0.3)));
Assert.assertEquals(-1, autoScaler.computeOptimalTaskCount(createMetrics(100.0, -1, 100, 0.3)));

// Negative pollIdleRatio (metric unavailable) should still allow scaling
int unavailableIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, -1.0));
MatcherAssert.assertThat(
"Negative pollIdleRatio should not reject scaling",
unavailableIdleResult,
Matchers.greaterThanOrEqualTo(1)
);

// High idle (underutilized) - should scale down
int scaleDownResult = autoScaler.computeOptimalTaskCount(createMetrics(100.0, 25, 100, 0.8));
Assert.assertTrue("Expected scale-down when idle ratio is high (>0.6)", scaleDownResult < 25);
Expand All @@ -195,26 +205,26 @@ public void testComputeOptimalTaskCount()
@Test
public void testExtractPollIdleRatio()
{
// Null and empty return 0
// Null and empty return -1 (no data)
Assert.assertEquals(
"Null stats should yield 0 idle ratios",
0.,
"Null stats should yield -1 idle ratio",
-1.,
CostBasedAutoScaler.extractPollIdleRatio(null),
0.0001
);
Assert.assertEquals(
"Empty stats should yield 0 idle ratios",
0.,
"Empty stats should yield -1 idle ratio",
-1.,
CostBasedAutoScaler.extractPollIdleRatio(Collections.emptyMap()),
0.0001
);

// Missing metrics return 0
// Missing metrics return -1 (no data)
Map<String, Map<String, Object>> missingMetrics = new HashMap<>();
missingMetrics.put("0", Collections.singletonMap("task-0", new HashMap<>()));
Assert.assertEquals(
"Missing autoscaler metrics should yield 0 idle ratios",
0.,
"Missing autoscaler metrics should yield -1 idle ratio",
-1.,
CostBasedAutoScaler.extractPollIdleRatio(missingMetrics),
0.0001
);
Expand All @@ -237,7 +247,7 @@ public void testExtractPollIdleRatio()
nonMapTask.put("0", Collections.singletonMap("task-0", "not-a-map"));
Assert.assertEquals(
"Non-map task stats should be ignored",
0.,
-1.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapTask),
0.0001
);
Expand All @@ -248,8 +258,8 @@ public void testExtractPollIdleRatio()
taskStats1.put(SeekableStreamIndexTaskRunner.AUTOSCALER_METRICS_KEY, new HashMap<>());
emptyAutoscaler.put("0", Collections.singletonMap("task-0", taskStats1));
Assert.assertEquals(
"Empty autoscaler metrics should yield 0 idle ratios",
0.,
"Empty autoscaler metrics should yield -1 idle ratio",
-1.,
CostBasedAutoScaler.extractPollIdleRatio(emptyAutoscaler),
0.0001
);
Expand All @@ -261,7 +271,7 @@ public void testExtractPollIdleRatio()
nonMapAutoscaler.put("0", Collections.singletonMap("task-0", taskStats2));
Assert.assertEquals(
"Non-map autoscaler metrics should be ignored",
0.,
-1.,
CostBasedAutoScaler.extractPollIdleRatio(nonMapAutoscaler),
0.0001
);
Expand All @@ -275,7 +285,7 @@ public void testExtractPollIdleRatio()
nonNumberRatio.put("0", Collections.singletonMap("task-0", taskStats3));
Assert.assertEquals(
"Non-numeric poll idle ratio should be ignored",
0.,
-1.,
CostBasedAutoScaler.extractPollIdleRatio(nonNumberRatio),
0.0001
);
Expand Down
Loading