Skip to content

Commit 6db1bc3

Browse files
committed
Discard excess tracking if the limit is reduced
1 parent 0af8050 commit 6db1bc3

File tree

4 files changed

+162
-35
lines changed

4 files changed

+162
-35
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/UndesiredAllocationsTracker.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.cluster.routing.allocation.allocator;
1111

1212
import com.carrotsearch.hppc.ObjectLongHashMap;
13+
import com.carrotsearch.hppc.ObjectLongMap;
1314

1415
import org.elasticsearch.cluster.routing.RoutingNodes;
1516
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -19,9 +20,13 @@
1920
import org.elasticsearch.common.settings.Setting;
2021
import org.elasticsearch.common.time.TimeProvider;
2122
import org.elasticsearch.core.TimeValue;
23+
import org.elasticsearch.core.Tuple;
2224
import org.elasticsearch.logging.LogManager;
2325
import org.elasticsearch.logging.Logger;
2426

27+
import java.util.stream.Collectors;
28+
import java.util.stream.StreamSupport;
29+
2530
/**
2631
* Keeps track of a limited number of shards that are currently in undesired allocations. If the
2732
* shards remain in undesired allocations for longer than a configurable threshold, it will log
@@ -114,6 +119,7 @@ public void cleanup(RoutingNodes routingNodes) {
114119
return true;
115120
}
116121
});
122+
shrinkIfOversized();
117123
}
118124

119125
/**
@@ -187,4 +193,26 @@ private void logUndesiredShardDetails(
187193
allocation.setDebugMode(originalDebugMode);
188194
}
189195
}
196+
197+
/**
198+
* If the maximum to track was reduced, and we are tracking more than the new maximum, purge the most recent entries
199+
* to bring us under the new limit
200+
*/
201+
private void shrinkIfOversized() {
202+
if (undesiredAllocations.size() > maxUndesiredAllocationsToTrack) {
203+
final var newestExcessValues = StreamSupport.stream(undesiredAllocations.spliterator(), false)
204+
// we need to take a copy from the cursor because the cursors are re-used, so don't work with #sorted
205+
.map(cursor -> new Tuple<>(cursor.key, cursor.value))
206+
.sorted((a, b) -> Long.compare(b.v2(), a.v2()))
207+
.limit(undesiredAllocations.size() - maxUndesiredAllocationsToTrack)
208+
.map(Tuple::v1)
209+
.collect(Collectors.toSet());
210+
undesiredAllocations.removeAll(newestExcessValues::contains);
211+
}
212+
}
213+
214+
// visible for testing
215+
ObjectLongMap<ShardRouting> getUndesiredAllocations() {
216+
return undesiredAllocations.clone();
217+
}
190218
}

server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/DesiredBalanceReconcilerTests.java

Lines changed: 1 addition & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
import org.elasticsearch.common.collect.ImmutableOpenMap;
5858
import org.elasticsearch.common.settings.ClusterSettings;
5959
import org.elasticsearch.common.settings.Settings;
60-
import org.elasticsearch.common.time.TimeProvider;
60+
import org.elasticsearch.common.time.AdvancingTimeProvider;
6161
import org.elasticsearch.common.util.Maps;
6262
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
6363
import org.elasticsearch.core.TimeValue;
@@ -84,15 +84,13 @@
8484
import java.util.Set;
8585
import java.util.concurrent.atomic.AtomicBoolean;
8686
import java.util.concurrent.atomic.AtomicInteger;
87-
import java.util.concurrent.atomic.AtomicLong;
8887
import java.util.concurrent.atomic.AtomicReference;
8988
import java.util.function.BiPredicate;
9089
import java.util.function.Consumer;
9190
import java.util.function.Predicate;
9291
import java.util.stream.Collectors;
9392
import java.util.stream.IntStream;
9493

95-
import static java.util.concurrent.TimeUnit.NANOSECONDS;
9694
import static org.elasticsearch.cluster.ClusterInfo.shardIdentifierFromRouting;
9795
import static org.elasticsearch.cluster.routing.RoutingNodesHelper.shardsWithState;
9896
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@@ -1750,36 +1748,4 @@ private static Settings throttleSettings() {
17501748
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 1000)
17511749
.build();
17521750
}
1753-
1754-
/**
1755-
* A time-provider that advances each time it's asked the time
1756-
*/
1757-
private static class AdvancingTimeProvider implements TimeProvider {
1758-
1759-
private final AtomicLong currentTimeMillis = new AtomicLong();
1760-
1761-
public void advanceByMillis(long milliseconds) {
1762-
currentTimeMillis.addAndGet(milliseconds);
1763-
}
1764-
1765-
@Override
1766-
public long relativeTimeInMillis() {
1767-
return currentTimeMillis.incrementAndGet();
1768-
}
1769-
1770-
@Override
1771-
public long relativeTimeInNanos() {
1772-
return NANOSECONDS.toNanos(relativeTimeInMillis());
1773-
}
1774-
1775-
@Override
1776-
public long rawRelativeTimeInMillis() {
1777-
return relativeTimeInMillis();
1778-
}
1779-
1780-
@Override
1781-
public long absoluteTimeInMillis() {
1782-
throw new UnsupportedOperationException("not supported");
1783-
}
1784-
}
17851751
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.cluster.routing.allocation.allocator;
11+
12+
import org.elasticsearch.cluster.metadata.ProjectId;
13+
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
14+
import org.elasticsearch.cluster.node.DiscoveryNodes;
15+
import org.elasticsearch.cluster.routing.GlobalRoutingTable;
16+
import org.elasticsearch.cluster.routing.IndexRoutingTable;
17+
import org.elasticsearch.cluster.routing.RecoverySource;
18+
import org.elasticsearch.cluster.routing.RoutingNodes;
19+
import org.elasticsearch.cluster.routing.RoutingTable;
20+
import org.elasticsearch.cluster.routing.ShardRouting;
21+
import org.elasticsearch.cluster.routing.UnassignedInfo;
22+
import org.elasticsearch.common.settings.ClusterSettings;
23+
import org.elasticsearch.common.settings.Settings;
24+
import org.elasticsearch.common.time.AdvancingTimeProvider;
25+
import org.elasticsearch.index.Index;
26+
import org.elasticsearch.index.shard.ShardId;
27+
import org.elasticsearch.test.ESTestCase;
28+
29+
import java.util.stream.Collectors;
30+
import java.util.stream.IntStream;
31+
import java.util.stream.StreamSupport;
32+
33+
public class UndesiredAllocationsTrackerTests extends ESTestCase {
34+
35+
public void testNewestRecordsArePurgedWhenLimitIsDecreased() {
36+
final var initialMaximum = randomIntBetween(10, 20);
37+
final var clusterSettings = ClusterSettings.createBuiltInClusterSettings(
38+
Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), initialMaximum).build()
39+
);
40+
final var advancingTimeProvider = new AdvancingTimeProvider();
41+
final var undesiredAllocationsTracker = new UndesiredAllocationsTracker(clusterSettings, advancingTimeProvider);
42+
final var indexName = randomIdentifier();
43+
final var index = new Index(indexName, indexName);
44+
final var indexRoutingTableBuilder = IndexRoutingTable.builder(index);
45+
final var discoveryNodesBuilder = DiscoveryNodes.builder();
46+
47+
// The shards with the lowest IDs will have the earliest timestamps
48+
for (int i = 0; i < initialMaximum; i++) {
49+
final var routing = createAssignedRouting(index, i, discoveryNodesBuilder);
50+
indexRoutingTableBuilder.addShard(routing);
51+
undesiredAllocationsTracker.trackUndesiredAllocation(routing);
52+
}
53+
final var routingNodes = RoutingNodes.immutable(
54+
GlobalRoutingTable.builder().put(ProjectId.DEFAULT, RoutingTable.builder().add(indexRoutingTableBuilder).build()).build(),
55+
discoveryNodesBuilder.build()
56+
);
57+
58+
// Reduce the maximum
59+
final var reducedMaximum = randomIntBetween(1, initialMaximum);
60+
clusterSettings.applySettings(
61+
Settings.builder().put(UndesiredAllocationsTracker.MAX_UNDESIRED_ALLOCATIONS_TO_TRACK.getKey(), reducedMaximum).build()
62+
);
63+
64+
// We shouldn't purge the entries from the setting updater thread
65+
assertEquals(initialMaximum, undesiredAllocationsTracker.getUndesiredAllocations().size());
66+
67+
// We should purge the most recent entries in #cleanup
68+
undesiredAllocationsTracker.cleanup(routingNodes);
69+
assertEquals(reducedMaximum, undesiredAllocationsTracker.getUndesiredAllocations().size());
70+
final var remainingShardIds = StreamSupport.stream(undesiredAllocationsTracker.getUndesiredAllocations().spliterator(), false)
71+
.map(olc -> olc.key.shardId().id())
72+
.collect(Collectors.toSet());
73+
assertEquals(IntStream.range(0, reducedMaximum).boxed().collect(Collectors.toSet()), remainingShardIds);
74+
}
75+
76+
private ShardRouting createAssignedRouting(Index index, int shardId, DiscoveryNodes.Builder discoveryNodesBuilder) {
77+
final var nodeId = randomAlphaOfLength(10);
78+
discoveryNodesBuilder.add(DiscoveryNodeUtils.create(nodeId));
79+
return ShardRouting.newUnassigned(
80+
new ShardId(index, shardId),
81+
true,
82+
RecoverySource.EmptyStoreRecoverySource.INSTANCE,
83+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, randomIdentifier()),
84+
randomFrom(ShardRouting.Role.DEFAULT, ShardRouting.Role.INDEX_ONLY)
85+
).initialize(nodeId, null, randomNonNegativeLong());
86+
}
87+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.common.time;
11+
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
import static java.util.concurrent.TimeUnit.NANOSECONDS;
15+
16+
/**
17+
* A time-provider that advances each time it's asked the time
18+
*/
19+
public class AdvancingTimeProvider implements TimeProvider {
20+
21+
private final AtomicLong currentTimeMillis = new AtomicLong(System.currentTimeMillis());
22+
23+
public void advanceByMillis(long milliseconds) {
24+
currentTimeMillis.addAndGet(milliseconds);
25+
}
26+
27+
@Override
28+
public long relativeTimeInMillis() {
29+
return currentTimeMillis.incrementAndGet();
30+
}
31+
32+
@Override
33+
public long relativeTimeInNanos() {
34+
return NANOSECONDS.toNanos(relativeTimeInMillis());
35+
}
36+
37+
@Override
38+
public long rawRelativeTimeInMillis() {
39+
return relativeTimeInMillis();
40+
}
41+
42+
@Override
43+
public long absoluteTimeInMillis() {
44+
throw new UnsupportedOperationException("not supported");
45+
}
46+
}

0 commit comments

Comments
 (0)