Skip to content

Commit 286aa4e

Browse files
committed
Adjust computeValidTaskCounts implementation
1 parent 1647ee9 commit 286aa4e

File tree

2 files changed

+16
-19
lines changed

2 files changed

+16
-19
lines changed

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.apache.druid.query.DruidMetrics;
3232

3333
import java.util.ArrayList;
34-
import java.util.Arrays;
3534
import java.util.List;
3635
import java.util.concurrent.Callable;
3736
import java.util.concurrent.ScheduledExecutorService;
@@ -163,7 +162,7 @@ public int computeOptimalTaskCount(CostMetrics metrics)
163162
return -1;
164163
}
165164

166-
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(partitionCount);
165+
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount);
167166

168167
if (validTaskCounts.length == 0) {
169168
log.warn("No valid task counts after applying constraints for supervisorId [%s]", supervisorId);
@@ -189,19 +188,7 @@ public int computeOptimalTaskCount(CostMetrics metrics)
189188
int optimalTaskCount = -1;
190189
double optimalCost = Double.POSITIVE_INFINITY;
191190

192-
final int bestTaskCountIndex = Arrays.binarySearch(validTaskCounts, currentTaskCount);
193-
for (int i = bestTaskCountIndex - SCALE_DOWN_FACTOR_DISCRETE_DISTANCE;
194-
i <= bestTaskCountIndex + SCALE_UP_FACTOR_DISCRETE_DISTANCE; i++) {
195-
// Range check.
196-
if (i < 0 || i >= validTaskCounts.length) {
197-
continue;
198-
}
199-
int taskCount = validTaskCounts[i];
200-
if (taskCount < config.getTaskCountMin()) {
201-
continue;
202-
} else if (taskCount > config.getTaskCountMax()) {
203-
break;
204-
}
191+
for (int taskCount : validTaskCounts) {
205192
double cost = costFunction.computeCost(metrics, taskCount, config);
206193
log.debug("Proposed task count: %d, Cost: %.4f", taskCount, cost);
207194
if (cost < optimalCost) {
@@ -238,16 +225,26 @@ public int computeOptimalTaskCount(CostMetrics metrics)
238225
*
239226
* @return sorted list of valid task counts within bounds
240227
*/
241-
static int[] computeValidTaskCounts(int partitionCount)
228+
static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
242229
{
243230
if (partitionCount <= 0) {
244231
return new int[]{};
245232
}
246233

247234
List<Integer> result = new ArrayList<>();
235+
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
236+
// To avoid confusion: minimum partitions per task means maximum amount of tasks (scale up) and vice versa.
237+
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - SCALE_UP_FACTOR_DISCRETE_DISTANCE);
238+
final int maxPartitionsPerTask = Math.min(
239+
partitionCount,
240+
Math.max(
241+
minPartitionsPerTask,
242+
currentPartitionsPerTask + SCALE_DOWN_FACTOR_DISCRETE_DISTANCE
243+
)
244+
);
248245

249-
for (int partitionsPerTask = partitionCount; partitionsPerTask >= 1; partitionsPerTask--) {
250-
int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask;
246+
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) {
247+
final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask;
251248
if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
252249
result.add(taskCount);
253250
}

indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testComputeValidTaskCountsGradualScaling()
7777
{
7878
// Verify gradual scaling: for 100 partitions, going from 25 tasks (4 partitions/task)
7979
// the next step should be 34 tasks (3 partitions/task)
80-
int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100);
80+
int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 25);
8181

8282
int idx25 = Arrays.binarySearch(validTaskCounts, 25);
8383
int idx34 = Arrays.binarySearch(validTaskCounts, 34);

0 commit comments

Comments
 (0)