Skip to content

Commit 223aaa0

Browse files
committed
Iterate directly over contents of RoutingNode
Today `RoutingNode#initializing`, `RoutingNode#started` and `RoutingNode#relocating` all work by copying the result into a freshly allocated array. These collections are immutable and all the (production) callers immediately iterate over the result so the copy is unnecessary. Moreover the `started` collection in particular might be huge. This commit adjusts these methods to return an iterator over the underlying collection instead, asserting that the resulting iterators are never mutated. Closes ES-13364
1 parent bcb1402 commit 223aaa0

File tree

9 files changed

+91
-23
lines changed

9 files changed

+91
-23
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/RoutingNode.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.cluster.routing;
1111

1212
import org.elasticsearch.cluster.node.DiscoveryNode;
13+
import org.elasticsearch.common.collect.Iterators;
1314
import org.elasticsearch.common.util.Maps;
1415
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.index.Index;
@@ -213,18 +214,16 @@ void remove(ShardRouting shard) {
213214
assert invariant();
214215
}
215216

216-
private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];
217-
218-
public ShardRouting[] initializing() {
219-
return initializingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
217+
public Iterator<ShardRouting> initializing() {
218+
return Iterators.assertReadOnly(initializingShards.iterator());
220219
}
221220

222-
public ShardRouting[] relocating() {
223-
return relocatingShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
221+
public Iterator<ShardRouting> relocating() {
222+
return Iterators.assertReadOnly(relocatingShards.iterator());
224223
}
225224

226-
public ShardRouting[] started() {
227-
return startedShards.toArray(EMPTY_SHARD_ROUTING_ARRAY);
225+
public Iterator<ShardRouting> started() {
226+
return Iterators.assertReadOnly(startedShards.iterator());
228227
}
229228

230229
/**
@@ -313,6 +312,8 @@ public String toString() {
313312
return sb.toString();
314313
}
315314

315+
private static final ShardRouting[] EMPTY_SHARD_ROUTING_ARRAY = new ShardRouting[0];
316+
316317
public ShardRouting[] copyShards() {
317318
return shards.values().toArray(EMPTY_SHARD_ROUTING_ARRAY);
318319
}

server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,9 @@ private static Map<String, Long> unaccountedSearchableSnapshotSizes(ClusterState
175175
DiskUsage usage = clusterInfo.getNodeMostAvailableDiskUsages().get(node.nodeId());
176176
ClusterInfo.ReservedSpace reservedSpace = clusterInfo.getReservedSpace(node.nodeId(), usage != null ? usage.path() : "");
177177
long totalSize = 0;
178-
for (ShardRouting shard : node.started()) {
178+
final var startedShardsIterator = node.started();
179+
while (startedShardsIterator.hasNext()) {
180+
final var shard = startedShardsIterator.next();
179181
if (shard.getExpectedShardSize() > 0
180182
&& clusterState.metadata().indexMetadata(shard.index()).isSearchableSnapshot()
181183
&& reservedSpace.containsShardId(shard.shardId()) == false

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputer.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.cluster.routing.allocation.allocator.DesiredBalanceShardsAllocator.ShardAllocationExplainer;
2626
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
2727
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
28+
import org.elasticsearch.common.collect.Iterators;
2829
import org.elasticsearch.common.metrics.MeanMetric;
2930
import org.elasticsearch.common.settings.ClusterSettings;
3031
import org.elasticsearch.common.settings.Setting;
@@ -38,6 +39,7 @@
3839
import java.util.ArrayList;
3940
import java.util.HashMap;
4041
import java.util.HashSet;
42+
import java.util.Iterator;
4143
import java.util.LinkedList;
4244
import java.util.List;
4345
import java.util.Map;
@@ -294,12 +296,13 @@ public DesiredBalance compute(
294296
final var rerouteExplanation = command.execute(routingAllocation, false);
295297
assert rerouteExplanation.decisions().type() != Decision.Type.NO : "should have thrown for NO decision";
296298
if (rerouteExplanation.decisions().type() != Decision.Type.NO) {
297-
final ShardRouting[] initializingShards = routingNodes.node(
299+
final Iterator<ShardRouting> initializingShardsIterator = routingNodes.node(
298300
routingAllocation.nodes().resolveNode(command.toNode()).getId()
299301
).initializing();
300-
assert initializingShards.length == 1
301-
: "expect exactly one relocating shard, but got: " + List.of(initializingShards);
302-
final var initializingShard = initializingShards[0];
302+
assert initializingShardsIterator.hasNext();
303+
final var initializingShard = initializingShardsIterator.next();
304+
assert initializingShardsIterator.hasNext() == false
305+
: "expect exactly one relocating shard, but got: " + Iterators.toList(initializingShardsIterator);
303306
assert routingAllocation.nodes()
304307
.resolveNode(command.fromNode())
305308
.getId()
@@ -535,11 +538,11 @@ static void maybeSimulateAlreadyStartedShards(
535538
// Find all shards that are started in RoutingNodes but have no data on corresponding node in ClusterInfo
536539
final var startedShards = new ArrayList<ShardRouting>();
537540
for (var routingNode : routingNodes) {
538-
for (var shardRouting : routingNode.started()) {
541+
routingNode.started().forEachRemaining(shardRouting -> {
539542
if (clusterInfo.hasShardMoved(shardRouting)) {
540543
startedShards.add(shardRouting);
541544
}
542-
}
545+
});
543546
}
544547
if (startedShards.isEmpty()) {
545548
return;

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ public static long sizeOfUnaccountedShards(
108108
// no longer initializing because their recovery failed or was cancelled.
109109

110110
// Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards
111-
for (ShardRouting routing : node.initializing()) {
111+
final var initializingShardsIterator = node.initializing();
112+
while (initializingShardsIterator.hasNext()) {
113+
final var routing = initializingShardsIterator.next();
112114
// Space needs to be reserved only when initializing shards that are going to use additional space
113115
// that is not yet accounted for by `reservedSpace` in case of lengthy recoveries
114116
if (shouldReserveSpaceForInitializingShard(routing, metadata) && reservedSpace.containsShardId(routing.shardId()) == false) {
@@ -135,7 +137,9 @@ public static long sizeOfUnaccountedShards(
135137
totalSize += sizeOfUnaccountableSearchableSnapshotShards;
136138

137139
if (subtractShardsMovingAway) {
138-
for (ShardRouting routing : node.relocating()) {
140+
final var relocatingShardsIterator = node.relocating();
141+
while (relocatingShardsIterator.hasNext()) {
142+
final var routing = relocatingShardsIterator.next();
139143
if (dataPath.equals(clusterInfo.getDataPath(routing))) {
140144
ProjectMetadata project = metadata.projectFor(routing.index());
141145
totalSize -= getExpectedShardSize(

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,17 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
127127

128128
// Count the primaries currently doing recovery on the node, to ensure the primariesInitialRecoveries setting is obeyed.
129129
int primariesInRecovery = 0;
130-
for (ShardRouting shard : node.initializing()) {
130+
final var initializingShardsIterator = node.initializing();
131+
while (initializingShardsIterator.hasNext()) {
132+
final var shard = initializingShardsIterator.next();
131133
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
132134
// we only count initial recoveries here, so we need to make sure that relocating node is null
133135
if (shard.primary() && shard.relocatingNodeId() == null) {
134136
primariesInRecovery++;
137+
if (allocation.debugDecision() && primariesInRecovery >= primariesInitialRecoveries) {
138+
// bail out early if we don't need the final total
139+
return THROTTLE;
140+
}
135141
}
136142
}
137143
if (primariesInRecovery >= primariesInitialRecoveries) {

server/src/main/java/org/elasticsearch/common/collect/Iterators.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.common.collect;
1111

12+
import org.elasticsearch.core.Assertions;
1213
import org.elasticsearch.core.Nullable;
1314

1415
import java.util.ArrayList;
@@ -507,6 +508,42 @@ public T next() {
507508
}
508509
}
509510

511+
/**
512+
* Adds a wrapper around {@code iterator} which asserts that {@link Iterator#remove()} is not called.
513+
*/
514+
public static <T> Iterator<T> assertReadOnly(final Iterator<T> iterator) {
515+
return Assertions.ENABLED ? new AssertReadOnlyIterator<>(Objects.requireNonNull(iterator)) : iterator;
516+
}
517+
518+
private static class AssertReadOnlyIterator<T> implements Iterator<T> {
519+
520+
private final Iterator<T> delegate;
521+
522+
AssertReadOnlyIterator(Iterator<T> delegate) {
523+
this.delegate = delegate;
524+
}
525+
526+
@Override
527+
public boolean hasNext() {
528+
return delegate.hasNext();
529+
}
530+
531+
@Override
532+
public T next() {
533+
return delegate.next();
534+
}
535+
536+
@Override
537+
public void forEachRemaining(Consumer<? super T> action) {
538+
delegate.forEachRemaining(action);
539+
}
540+
541+
@Override
542+
public void remove() {
543+
throw new AssertionError();
544+
}
545+
}
546+
510547
public static <T> boolean equals(Iterator<? extends T> iterator1, Iterator<? extends T> iterator2, BiPredicate<T, T> itemComparer) {
511548
if (iterator1 == null) {
512549
return iterator2 == null;

server/src/test/java/org/elasticsearch/cluster/routing/RoutingNodeTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818
import org.elasticsearch.test.ESTestCase;
1919

2020
import java.net.InetAddress;
21-
import java.util.Arrays;
21+
import java.util.HashSet;
2222
import java.util.Set;
23-
import java.util.stream.Collectors;
2423

2524
import static java.util.Collections.emptyMap;
2625
import static java.util.Collections.emptySet;
@@ -161,7 +160,9 @@ public void testReturnStartedShards() {
161160
}
162161

163162
private static Set<ShardId> startedShardsSet(RoutingNode routingNode) {
164-
return Arrays.stream(routingNode.started()).map(ShardRouting::shardId).collect(Collectors.toSet());
163+
final var result = new HashSet<ShardId>();
164+
routingNode.started().forEachRemaining(shardRouting -> result.add(shardRouting.shardId()));
165+
return result;
165166
}
166167

167168
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceComputerTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
5252
import org.elasticsearch.common.Randomness;
5353
import org.elasticsearch.common.UUIDs;
54+
import org.elasticsearch.common.collect.Iterators;
5455
import org.elasticsearch.common.settings.ClusterSettings;
5556
import org.elasticsearch.common.settings.Settings;
5657
import org.elasticsearch.common.time.TimeProvider;
@@ -103,10 +104,10 @@
103104
import static org.hamcrest.Matchers.aMapWithSize;
104105
import static org.hamcrest.Matchers.allOf;
105106
import static org.hamcrest.Matchers.anyOf;
106-
import static org.hamcrest.Matchers.arrayWithSize;
107107
import static org.hamcrest.Matchers.equalTo;
108108
import static org.hamcrest.Matchers.everyItem;
109109
import static org.hamcrest.Matchers.hasEntry;
110+
import static org.hamcrest.Matchers.hasSize;
110111
import static org.hamcrest.Matchers.lessThanOrEqualTo;
111112
import static org.hamcrest.Matchers.notNullValue;
112113
import static org.mockito.Mockito.mock;
@@ -581,7 +582,7 @@ public void allocate(RoutingAllocation allocation) {
581582
allocation.routingNodes().getRelocatingShardCount(),
582583
equalTo(0)
583584
);
584-
assertThat(allocation.routingNodes().node("node-2").started(), arrayWithSize(2));
585+
assertThat(Iterators.toList(allocation.routingNodes().node("node-2").started()), hasSize(2));
585586
}
586587

587588
@Override

server/src/test/java/org/elasticsearch/common/collect/IteratorsTests.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,19 @@ public void testCycle() {
328328
}
329329
}
330330

331+
public void testAssertReadOnly() {
332+
assumeTrue("assertions enabled", Assertions.ENABLED);
333+
final List<Integer> innerList = new ArrayList<>(List.of(1, 2, 3, 4));
334+
assertTrue(Iterators.equals(innerList.iterator(), Iterators.assertReadOnly(innerList.iterator()), Objects::equals));
335+
336+
final var readonly = Iterators.assertReadOnly(innerList.iterator());
337+
assertTrue(readonly.hasNext());
338+
assertEquals(Integer.valueOf(1), readonly.next());
339+
expectThrows(AssertionError.class, readonly::remove);
340+
341+
assertEquals(List.of(1, 2, 3, 4), innerList);
342+
}
343+
331344
public void testEquals() {
332345
final BiPredicate<Object, Object> notCalled = (a, b) -> { throw new AssertionError("not called"); };
333346

0 commit comments

Comments
 (0)