99
1010package org .elasticsearch .cluster .routing .allocation .allocator ;
1111
12- import com .carrotsearch .hppc .ObjectLongHashMap ;
13- import com .carrotsearch .hppc .ObjectLongMap ;
14-
1512import org .elasticsearch .cluster .routing .RoutingNodes ;
1613import org .elasticsearch .cluster .routing .ShardRouting ;
1714import org .elasticsearch .cluster .routing .allocation .RoutingAllocation ;
2017import org .elasticsearch .common .settings .Setting ;
2118import org .elasticsearch .common .time .TimeProvider ;
2219import org .elasticsearch .core .TimeValue ;
23- import org .elasticsearch .core . Tuple ;
20+ import org .elasticsearch .index . shard . ShardId ;
2421import org .elasticsearch .logging .LogManager ;
2522import org .elasticsearch .logging .Logger ;
2623
24+ import java .util .HashMap ;
25+ import java .util .Map ;
2726import java .util .stream .Collectors ;
28- import java .util .stream .StreamSupport ;
2927
3028/**
3129 * Keeps track of a limited number of shards that are currently in undesired allocations. If the
@@ -70,7 +68,7 @@ public class UndesiredAllocationsTracker {
7068 );
7169
7270 private final TimeProvider timeProvider ;
73- private final ObjectLongHashMap < ShardRouting > undesiredAllocations = new ObjectLongHashMap <>();
71+ private final Map < String , UndesiredAllocation > undesiredAllocations = new HashMap <>();
7472 private final FrequencyCappedAction undesiredAllocationDurationLogInterval ;
7573 private volatile TimeValue undesiredAllocationDurationLoggingThreshold ;
7674 private volatile int maxUndesiredAllocationsToTrack ;
@@ -94,30 +92,36 @@ public class UndesiredAllocationsTracker {
9492 */
9593 public void trackUndesiredAllocation (ShardRouting shardRouting ) {
9694 assert shardRouting .unassigned () == false : "Shouldn't record unassigned shards as undesired allocations" ;
97- if (undesiredAllocations .size () < maxUndesiredAllocationsToTrack && undesiredAllocations .containsKey (shardRouting ) == false ) {
98- undesiredAllocations .put (shardRouting , timeProvider .relativeTimeInMillis ());
95+ if (undesiredAllocations .size () < maxUndesiredAllocationsToTrack ) {
96+ final var allocationId = shardRouting .allocationId ().getId ();
97+ if (undesiredAllocations .containsKey (allocationId ) == false ) {
98+ undesiredAllocations .put (
99+ allocationId ,
100+ new UndesiredAllocation (shardRouting .shardId (), timeProvider .relativeTimeInMillis ())
101+ );
102+ }
99103 }
100104 }
101105
102106 /**
103107 * Remove any tracking of the specified allocation (a no-op if the allocation isn't being tracked)
104108 */
105109 public void removeTracking (ShardRouting shardRouting ) {
106- undesiredAllocations .remove (shardRouting );
110+ if (shardRouting .unassigned () == false ) {
111+ undesiredAllocations .remove (shardRouting .allocationId ().getId ());
112+ } else {
113+ assert false : "Shouldn't remove tracking of unassigned shards" ;
114+ }
107115 }
108116
109117 /**
110118 * Clear any {@link ShardRouting} that are no longer present in the routing nodes
111119 */
112120 public void cleanup (RoutingNodes routingNodes ) {
113- undesiredAllocations .removeAll (shardRouting -> {
114- final var allocationId = shardRouting .allocationId ();
115- if (allocationId != null ) {
116- return routingNodes .getByAllocationId (shardRouting .shardId (), allocationId .getId ()) == null ;
117- } else {
118- assert false : "Unassigned shards shouldn't be marked as undesired" ;
119- return true ;
120- }
121+ undesiredAllocations .entrySet ().removeIf (e -> {
122+ final var undesiredAllocation = e .getValue ();
123+ final var allocationId = e .getKey ();
124+ return routingNodes .getByAllocationId (undesiredAllocation .shardId (), allocationId ) == null ;
121125 });
122126 shrinkIfOversized ();
123127 }
@@ -140,9 +144,9 @@ public void maybeLogUndesiredShardsWarning(
140144 ) {
141145 final long currentTimeMillis = timeProvider .relativeTimeInMillis ();
142146 long earliestUndesiredTimestamp = Long .MAX_VALUE ;
143- for (var allocation : undesiredAllocations ) {
144- if (allocation . value < earliestUndesiredTimestamp ) {
145- earliestUndesiredTimestamp = allocation . value ;
147+ for (var undesiredAllocation : undesiredAllocations . values () ) {
148+ if (undesiredAllocation . undesiredSince () < earliestUndesiredTimestamp ) {
149+ earliestUndesiredTimestamp = undesiredAllocation . undesiredSince () ;
146150 }
147151 }
148152 if (earliestUndesiredTimestamp < currentTimeMillis
@@ -160,15 +164,22 @@ private void logDecisionsForUndesiredShardsOverThreshold(
160164 ) {
161165 final long currentTimeMillis = timeProvider .relativeTimeInMillis ();
162166 final long loggingThresholdTimestamp = currentTimeMillis - undesiredAllocationDurationLoggingThreshold .millis ();
163- for (var allocation : undesiredAllocations ) {
164- if (allocation .value < loggingThresholdTimestamp ) {
165- logUndesiredShardDetails (
166- allocation .key ,
167- TimeValue .timeValueMillis (currentTimeMillis - allocation .value ),
168- routingNodes ,
169- routingAllocation ,
170- desiredBalance
171- );
167+ for (var allocation : undesiredAllocations .entrySet ()) {
168+ final var undesiredAllocation = allocation .getValue ();
169+ final var allocationId = allocation .getKey ();
170+ if (undesiredAllocation .undesiredSince () < loggingThresholdTimestamp ) {
171+ final var shardRouting = routingNodes .getByAllocationId (undesiredAllocation .shardId (), allocationId );
172+ if (shardRouting != null ) {
173+ logUndesiredShardDetails (
174+ shardRouting ,
175+ TimeValue .timeValueMillis (currentTimeMillis - undesiredAllocation .undesiredSince ()),
176+ routingNodes ,
177+ routingAllocation ,
178+ desiredBalance
179+ );
180+ } else {
181+ assert false : undesiredAllocation + " for allocationID " + allocationId + " was not cleaned up" ;
182+ }
172183 }
173184 }
174185 }
@@ -200,19 +211,28 @@ private void logUndesiredShardDetails(
200211 */
201212 private void shrinkIfOversized () {
202213 if (undesiredAllocations .size () > maxUndesiredAllocationsToTrack ) {
203- final var newestExcessValues = StreamSupport .stream (undesiredAllocations .spliterator (), false )
204- // we need to take a copy from the cursor because the cursors are re-used, so don't work with #sorted
205- .map (cursor -> new Tuple <>(cursor .key , cursor .value ))
206- .sorted ((a , b ) -> Long .compare (b .v2 (), a .v2 ()))
214+ final var newestExcessAllocationIds = undesiredAllocations .entrySet ()
215+ .stream ()
216+ .sorted ((a , b ) -> Long .compare (b .getValue ().undesiredSince (), a .getValue ().undesiredSince ()))
207217 .limit (undesiredAllocations .size () - maxUndesiredAllocationsToTrack )
208- .map (Tuple :: v1 )
218+ .map (Map . Entry :: getKey )
209219 .collect (Collectors .toSet ());
210- undesiredAllocations .removeAll (newestExcessValues :: contains );
220+ undesiredAllocations .keySet (). removeAll (newestExcessAllocationIds );
211221 }
212222 }
213223
214224 // visible for testing
215- ObjectLongMap < ShardRouting > getUndesiredAllocations () {
216- return undesiredAllocations . clone ( );
225+ Map < String , UndesiredAllocation > getUndesiredAllocations () {
226+ return Map . copyOf ( undesiredAllocations );
217227 }
228+
229+ /**
230+ * Rather than storing the {@link ShardRouting}, we store a map of allocationId -> {@link UndesiredAllocation}
231+ * this is because the allocation ID will persist as long as a shard stays on the same node, but the
232+ * {@link ShardRouting} changes for a variety of reasons even when the shard doesn't move.
233+ *
234+ * @param shardId The shard ID
235+ * @param undesiredSince The timestamp when the shard was first observed in an undesired allocation
236+ */
237+ record UndesiredAllocation (ShardId shardId , long undesiredSince ) {}
218238}
0 commit comments