Skip to content

Commit 2cbc657

Browse files
authored
Do not pass ThreadPool to DesiredBalanceComputer (#116590)
Relates #115511 (comment). `ThreadPool` is used here only to get time. (I've extracted this out of #116333).
1 parent e0aa1ad commit 2cbc657

File tree

5 files changed

+44
-44
lines changed

5 files changed

+44
-44
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.core.Strings;
2727
import org.elasticsearch.core.TimeValue;
2828
import org.elasticsearch.index.shard.ShardId;
29-
import org.elasticsearch.threadpool.ThreadPool;
3029

3130
import java.util.ArrayList;
3231
import java.util.HashMap;
@@ -74,11 +73,7 @@ public class DesiredBalanceComputer {
7473
private TimeValue progressLogInterval;
7574
private long maxBalanceComputationTimeDuringIndexCreationMillis;
7675

77-
public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
78-
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
79-
}
80-
81-
DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
76+
public DesiredBalanceComputer(ClusterSettings clusterSettings, LongSupplier timeSupplierMillis, ShardsAllocator delegateAllocator) {
8277
this.delegateAllocator = delegateAllocator;
8378
this.timeSupplierMillis = timeSupplierMillis;
8479
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public DesiredBalanceShardsAllocator(
9191
delegateAllocator,
9292
threadPool,
9393
clusterService,
94-
new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator),
94+
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator),
9595
reconciler,
9696
telemetryProvider
9797
);

server/src/test/java/org/elasticsearch/action/admin/cluster/allocation/TransportDeleteDesiredBalanceActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void testDeleteDesiredBalance() throws Exception {
101101
var clusterSettings = ClusterSettings.createBuiltInClusterSettings(settings);
102102

103103
var delegate = new BalancedShardsAllocator();
104-
var computer = new DesiredBalanceComputer(clusterSettings, threadPool, delegate) {
104+
var computer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegate) {
105105

106106
final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();
107107

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import org.elasticsearch.snapshots.SnapshotId;
5454
import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
5555
import org.elasticsearch.test.MockLog;
56-
import org.elasticsearch.threadpool.ThreadPool;
5756

5857
import java.util.ArrayList;
5958
import java.util.HashMap;
@@ -85,8 +84,6 @@
8584
import static org.hamcrest.Matchers.hasEntry;
8685
import static org.hamcrest.Matchers.lessThanOrEqualTo;
8786
import static org.hamcrest.Matchers.notNullValue;
88-
import static org.mockito.Mockito.mock;
89-
import static org.mockito.Mockito.when;
9087

9188
public class DesiredBalanceComputerTests extends ESAllocationTestCase {
9289

@@ -1205,43 +1202,43 @@ public void testShouldLogComputationIteration() {
12051202
}
12061203

12071204
private void checkIterationLogging(int iterations, long eachIterationDuration, MockLog.AbstractEventExpectation expectation) {
1208-
1209-
var mockThreadPool = mock(ThreadPool.class);
12101205
var currentTime = new AtomicLong(0L);
1211-
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));
1212-
12131206
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
12141207
// prevents interrupting a long computation.
12151208
var clusterSettings = createBuiltInClusterSettings(
12161209
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
12171210
);
1218-
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
1219-
@Override
1220-
public void allocate(RoutingAllocation allocation) {
1221-
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
1222-
while (unassignedIterator.hasNext()) {
1223-
final var shardRouting = unassignedIterator.next();
1224-
if (shardRouting.primary()) {
1225-
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
1226-
} else {
1227-
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
1211+
var desiredBalanceComputer = new DesiredBalanceComputer(
1212+
clusterSettings,
1213+
() -> currentTime.addAndGet(eachIterationDuration),
1214+
new ShardsAllocator() {
1215+
@Override
1216+
public void allocate(RoutingAllocation allocation) {
1217+
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
1218+
while (unassignedIterator.hasNext()) {
1219+
final var shardRouting = unassignedIterator.next();
1220+
if (shardRouting.primary()) {
1221+
unassignedIterator.initialize("node-0", null, 0L, allocation.changes());
1222+
} else {
1223+
unassignedIterator.removeAndIgnore(UnassignedInfo.AllocationStatus.NO_ATTEMPT, allocation.changes());
1224+
}
12281225
}
1229-
}
12301226

1231-
// move shard on each iteration
1232-
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
1233-
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
1234-
}
1235-
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
1236-
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
1227+
// move shard on each iteration
1228+
for (var shard : allocation.routingNodes().node("node-0").shardsWithState(STARTED).toList()) {
1229+
allocation.routingNodes().relocateShard(shard, "node-1", 0L, "test", allocation.changes());
1230+
}
1231+
for (var shard : allocation.routingNodes().node("node-1").shardsWithState(STARTED).toList()) {
1232+
allocation.routingNodes().relocateShard(shard, "node-0", 0L, "test", allocation.changes());
1233+
}
12371234
}
1238-
}
12391235

1240-
@Override
1241-
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
1242-
throw new AssertionError("only used for allocation explain");
1236+
@Override
1237+
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
1238+
throw new AssertionError("only used for allocation explain");
1239+
}
12431240
}
1244-
});
1241+
);
12451242

12461243
assertThatLogger(() -> {
12471244
var iteration = new AtomicInteger(0);
@@ -1349,7 +1346,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
13491346
}
13501347

13511348
private static DesiredBalanceComputer createDesiredBalanceComputer(ShardsAllocator allocator) {
1352-
return new DesiredBalanceComputer(createBuiltInClusterSettings(), mock(ThreadPool.class), allocator);
1349+
return new DesiredBalanceComputer(createBuiltInClusterSettings(), () -> 0L, allocator);
13531350
}
13541351

13551352
private static void assertDesiredAssignments(DesiredBalance desiredBalance, Map<ShardId, ShardAssignment> expected) {

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceShardsAllocatorTests.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
396396
shardsAllocator,
397397
threadPool,
398398
clusterService,
399-
new DesiredBalanceComputer(clusterSettings, shardsAllocator, time::get) {
399+
new DesiredBalanceComputer(clusterSettings, time::get, shardsAllocator) {
400400
@Override
401401
public DesiredBalance compute(
402402
DesiredBalance previousDesiredBalance,
@@ -522,7 +522,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
522522
shardsAllocator,
523523
threadPool,
524524
clusterService,
525-
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
525+
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
526526
@Override
527527
public DesiredBalance compute(
528528
DesiredBalance previousDesiredBalance,
@@ -625,7 +625,7 @@ public ClusterState apply(ClusterState clusterState, RerouteStrategy routingAllo
625625
shardsAllocator,
626626
threadPool,
627627
clusterService,
628-
new DesiredBalanceComputer(clusterSettings, threadPool, shardsAllocator) {
628+
new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, shardsAllocator) {
629629
@Override
630630
public DesiredBalance compute(
631631
DesiredBalance previousDesiredBalance,
@@ -712,7 +712,7 @@ public void testResetDesiredBalance() {
712712
var delegateAllocator = createShardsAllocator();
713713
var clusterSettings = createBuiltInClusterSettings();
714714

715-
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool, delegateAllocator) {
715+
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, threadPool::relativeTimeInMillis, delegateAllocator) {
716716

717717
final AtomicReference<DesiredBalance> lastComputationInput = new AtomicReference<>();
718718

@@ -780,7 +780,11 @@ public void testResetDesiredBalanceOnNoLongerMaster() {
780780
var clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
781781

782782
var delegateAllocator = createShardsAllocator();
783-
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
783+
var desiredBalanceComputer = new DesiredBalanceComputer(
784+
createBuiltInClusterSettings(),
785+
threadPool::relativeTimeInMillis,
786+
delegateAllocator
787+
);
784788
var desiredBalanceShardsAllocator = new DesiredBalanceShardsAllocator(
785789
delegateAllocator,
786790
threadPool,
@@ -829,7 +833,11 @@ public void testResetDesiredBalanceOnNodeShutdown() {
829833

830834
final var resetCalled = new AtomicBoolean();
831835
var delegateAllocator = createShardsAllocator();
832-
var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), threadPool, delegateAllocator);
836+
var desiredBalanceComputer = new DesiredBalanceComputer(
837+
createBuiltInClusterSettings(),
838+
threadPool::relativeTimeInMillis,
839+
delegateAllocator
840+
);
833841
var desiredBalanceAllocator = new DesiredBalanceShardsAllocator(
834842
delegateAllocator,
835843
threadPool,

0 commit comments

Comments
 (0)