-
Notifications
You must be signed in to change notification settings - Fork 25.5k
Implement move non preferred phase in allocator #134429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
53bfa9b
971a395
f0f9f77
71232d5
c31b05a
967e76e
d147a18
4f7b519
91ee197
1d3b08e
687c6e2
1a4c85a
a14af62
3196d21
d63012c
7918b3b
d349668
0c34875
22ad4d9
b7fcc4a
ab73569
cebf3a9
e215a55
53c7b75
0d4a5c7
af93b87
de052a3
d59ea2b
ab127a9
6093a7a
cf03477
e79a84a
a433918
ac0ec29
c2ee39f
69a545a
c76af05
630d06d
5103231
e888265
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -114,6 +114,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { | |
private final BalancerSettings balancerSettings; | ||
private final WriteLoadForecaster writeLoadForecaster; | ||
private final BalancingWeightsFactory balancingWeightsFactory; | ||
private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; | ||
|
||
public BalancedShardsAllocator() { | ||
this(Settings.EMPTY); | ||
|
@@ -124,18 +125,25 @@ public BalancedShardsAllocator(Settings settings) { | |
} | ||
|
||
public BalancedShardsAllocator(BalancerSettings balancerSettings, WriteLoadForecaster writeLoadForecaster) { | ||
this(balancerSettings, writeLoadForecaster, new GlobalBalancingWeightsFactory(balancerSettings)); | ||
this( | ||
balancerSettings, | ||
writeLoadForecaster, | ||
new GlobalBalancingWeightsFactory(balancerSettings), | ||
NonPreferredShardIteratorFactory.NOOP | ||
); | ||
} | ||
|
||
@Inject | ||
public BalancedShardsAllocator( | ||
BalancerSettings balancerSettings, | ||
WriteLoadForecaster writeLoadForecaster, | ||
BalancingWeightsFactory balancingWeightsFactory | ||
BalancingWeightsFactory balancingWeightsFactory, | ||
NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory | ||
) { | ||
this.balancerSettings = balancerSettings; | ||
this.writeLoadForecaster = writeLoadForecaster; | ||
this.balancingWeightsFactory = balancingWeightsFactory; | ||
this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; | ||
} | ||
|
||
@Override | ||
|
@@ -152,9 +160,16 @@ public void allocate(RoutingAllocation allocation) { | |
return; | ||
} | ||
final BalancingWeights balancingWeights = balancingWeightsFactory.create(); | ||
final Balancer balancer = new Balancer(writeLoadForecaster, allocation, balancerSettings.getThreshold(), balancingWeights); | ||
final Balancer balancer = new Balancer( | ||
writeLoadForecaster, | ||
allocation, | ||
balancerSettings.getThreshold(), | ||
balancingWeights, | ||
nonPreferredShardIteratorFactory | ||
); | ||
balancer.allocateUnassigned(); | ||
balancer.moveShards(); | ||
balancer.moveNonPreferred(); | ||
|
||
balancer.balance(); | ||
|
||
// Node weights are calculated after each internal balancing round and saved to the RoutingNodes copy. | ||
|
@@ -188,7 +203,8 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f | |
writeLoadForecaster, | ||
allocation, | ||
balancerSettings.getThreshold(), | ||
balancingWeightsFactory.create() | ||
balancingWeightsFactory.create(), | ||
nonPreferredShardIteratorFactory | ||
); | ||
AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; | ||
MoveDecision moveDecision = MoveDecision.NOT_TAKEN; | ||
|
@@ -248,12 +264,14 @@ public static class Balancer { | |
private final Map<String, ModelNode> nodes; | ||
private final BalancingWeights balancingWeights; | ||
private final NodeSorters nodeSorters; | ||
private final NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory; | ||
|
||
private Balancer( | ||
WriteLoadForecaster writeLoadForecaster, | ||
RoutingAllocation allocation, | ||
float threshold, | ||
BalancingWeights balancingWeights | ||
BalancingWeights balancingWeights, | ||
NonPreferredShardIteratorFactory nonPreferredShardIteratorFactory | ||
) { | ||
this.writeLoadForecaster = writeLoadForecaster; | ||
this.allocation = allocation; | ||
|
@@ -266,6 +284,7 @@ private Balancer( | |
nodes = Collections.unmodifiableMap(buildModelFromAssigned()); | ||
this.nodeSorters = balancingWeights.createNodeSorters(nodesArray(), this); | ||
this.balancingWeights = balancingWeights; | ||
this.nonPreferredShardIteratorFactory = nonPreferredShardIteratorFactory; | ||
} | ||
|
||
private static long getShardDiskUsageInBytes(ShardRouting shardRouting, IndexMetadata indexMetadata, ClusterInfo clusterInfo) { | ||
|
@@ -711,6 +730,92 @@ protected int comparePivot(int j) { | |
return indices; | ||
} | ||
|
||
/** | ||
* Move started shards that are in non-preferred allocations | ||
*/ | ||
public void moveNonPreferred() { | ||
while (moveASingleNonPreferredShard()) { | ||
// keep trying until we're unable to move any more | ||
// TODO: Update cluster info | ||
} | ||
} | ||
|
||
private boolean moveASingleNonPreferredShard() { | ||
for (ShardRouting shardRouting : nonPreferredShardIteratorFactory.createNonPreferredShardIterator(allocation)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I expect you might be trying to be too generic. We know that we're dealing with the write load decider, and every shard will return not-preferred when there is a hot-spot. So we want a list of shards on a particular node (that's hot-spotting) ordered by write load estimate. Not a concrete suggestion, rather a general thought on the implementation approach. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was more explicit about that in my first attempt at this, but I don't think the approach was well received as it's something of a departure from the way deciders work currently. What you suggest does actually happen in the I build this list lazily because the hope is we don't have to iterate too far through it to find a shard that's movable. So the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't taken a good look at your first attempt. My first thought would be to avoid using the deciders until we've got a list of ordered shards for a hot node to try to relocate. Start by filtering down to the nodes exceeding the queue latency threshold, discard the other nodes. Then the allocation deciders only come into play to select a new node assignment. We would be ignoring the WriteLoadDecider's canRemain method... It's not obvious to me how to not ignore it 🤔 To move moveNonPreferred before moveShards, we'd have NO answers covering NOT_PREFERRED answers, which is another problem with using canRemain.
We only need to look at hot-spotting nodes, and there's no need to create a relative order for the nodes. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think my initial thought would be to run through all the nodes, call I think we've discussed this, but maybe it was discarded? I think this prepares us better for multiple deciders saying not-preferred. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry @henningandersen I did deviate a little from what we discussed, but thinking only from an optimisation perspective, I figured it would be conceptually the same structure. i.e my implementation does
only because, if I understand correctly, you've advocated for
My thinking was that the latter approach would do loads of work up front (e.g. in a cluster with ~10,000 shards on each of multiple hot-spotted nodes) only to then move a single shard. The I think you mentioned there may be cases where we could implement special logic if we knew the full set of shards that were moveable in that prioritisation logic, but it seems to me we should defer that cost until we identify some such scenarios? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If a node is write load hot-spotting, then canAllocate YES sounds like a nice filter.
I can't see a way for the balancer not to know about individual deciders for not-preferred / hotspots. Suppose the heap usage returned not-preferred (it doesn't, but for sake of discussion). If the balancer checks all the deciders for canRemain NOT_PREFERRED, and finds a hot-spot, we move on to correcting the hot-spot. However, to correct the hot-spot, we need to know which resource is hot spotting because the shard order prioritization will be different for write load vs heap usage. I think the balancer needs to know about individual deciders to address hot-spots, in order to prioritize the shards for relocation. Alternatively, a decider would need to be responsible for providing a strategy for ordering shards -- the AllocationDeciders would return a list of strategies, and the balancer runs a strategy per resource hot-spot. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the main case I want to add next is the index anti-affinity and there I think the strategy of picking a relevant loaded shard of the candidates is still good. But I agree we may want a more advanced strategy. It could however also look at the base data again, determining out of the moveable shards which one to pick based on the known dimensions. That could be as simple as "if the node has a queue latency go by write-load, otherwise pick one, does not matter which (some determinism may be preferable though)". |
||
if (tryMoveShardIfNonPreferred(shardRouting)) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
nicktindall marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
private boolean tryMoveShardIfNonPreferred(ShardRouting shardRouting) { | ||
ProjectIndex index = projectIndex(shardRouting); | ||
final MoveDecision moveDecision = decideMoveNonPreferred(index, shardRouting); | ||
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { | ||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
final ModelNode targetNode = nodes.get(moveDecision.getTargetNode().getId()); | ||
sourceNode.removeShard(index, shardRouting); | ||
Tuple<ShardRouting, ShardRouting> relocatingShards = routingNodes.relocateShard( | ||
shardRouting, | ||
targetNode.getNodeId(), | ||
allocation.clusterInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE), | ||
"non-preferred", | ||
allocation.changes() | ||
); | ||
final ShardRouting shard = relocatingShards.v2(); | ||
targetNode.addShard(projectIndex(shard), shard); | ||
if (logger.isTraceEnabled()) { | ||
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); | ||
} | ||
return true; | ||
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { | ||
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); | ||
} | ||
return false; | ||
} | ||
|
||
/** | ||
* Makes a decision on whether to move a started shard to another node. The following rules apply | ||
* to the {@link MoveDecision} return object: | ||
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. | ||
* 2. If the shard's current allocation is preferred ({@link Decision.Type#YES}), no attempt will be made to move the shard and | ||
* {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null. | ||
* 3. If the shard is not allowed ({@link Decision.Type#NO}), or not preferred ({@link Decision.Type#NOT_PREFERRED}) to remain | ||
* on its current node, then {@link MoveDecision#getAllocationDecision()} will be populated with the decision of moving to | ||
* another node. If {@link MoveDecision#forceMove()} returns {@code true}, then {@link MoveDecision#getTargetNode} will return | ||
* a non-null value representing a node that returned {@link Decision.Type#YES} from canAllocate, otherwise the assignedNodeId | ||
* will be null. | ||
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then | ||
* {@link MoveDecision#getNodeDecisions} will have a non-null value. | ||
*/ | ||
public MoveDecision decideMoveNonPreferred(final ProjectIndex index, final ShardRouting shardRouting) { | ||
NodeSorter sorter = nodeSorters.sorterForShard(shardRouting); | ||
index.assertMatch(shardRouting); | ||
|
||
if (shardRouting.started() == false) { | ||
// we can only move started shards | ||
return MoveDecision.NOT_TAKEN; | ||
} | ||
|
||
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); | ||
assert sourceNode != null && sourceNode.containsShard(index, shardRouting); | ||
RoutingNode routingNode = sourceNode.getRoutingNode(); | ||
Decision canRemain = allocation.deciders().canRemain(shardRouting, routingNode, allocation); | ||
if ((canRemain.type() == Type.NOT_PREFERRED || canRemain.type() == Type.NO) == false) { | ||
return MoveDecision.remain(canRemain); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will consider There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't quite follow here. I'd appreciate if you could help me understand it better. Do you mean the decision could be an overall There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In This will come into play now, as @DiannaHohensee and I discussed this morning it's probably better to run There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate on that argument about moving non-preferred first. I would naively think we want to ensure we move all hard-rules first - to vacate nodes - and then move the non-preferred after. I think that also avoids this slightly confusing check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll not that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The Balanced/DesiredBalanceShardsAllocators do not pick order of shard movement. The Reconciler does that -- the allocator and reconciler happen to have the same order, but I think the priority for the allocator is to make the best choices, not consider shard movement priority. The reconciler behavior is actually in my balancer changes patch. The exception is allocateUnassigned for primaries, for which there's an early exit from the allocators to publish the DesiredBalance ASAP. |
||
|
||
sorter.reset(index); | ||
/* | ||
* the sorter holds the minimum weight node first for the shards index. | ||
* We now walk through the nodes until we find a node to allocate the shard. | ||
* This is not guaranteed to be balanced after this operation we still try best effort to | ||
* allocate on the minimal eligible node. | ||
*/ | ||
return decideMove(sorter, shardRouting, sourceNode, canRemain, this::decideCanAllocatePreferredOnly); | ||
} | ||
|
||
/** | ||
* Move started shards that can not be allocated to a node anymore | ||
* | ||
|
@@ -839,6 +944,15 @@ private MoveDecision decideMove( | |
); | ||
} | ||
|
||
private Decision decideCanAllocatePreferredOnly(ShardRouting shardRouting, RoutingNode target) { | ||
Decision decision = allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
// not-preferred means no here | ||
if (decision.type() == Type.NOT_PREFERRED) { | ||
return Decision.NO; | ||
} | ||
return decision; | ||
} | ||
|
||
private Decision decideCanAllocate(ShardRouting shardRouting, RoutingNode target) { | ||
// don't use canRebalance as we want hard filtering rules to apply. See #17698 | ||
return allocation.deciders().canAllocate(shardRouting, target, allocation); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.cluster.routing.allocation.allocator; | ||
|
||
import org.elasticsearch.cluster.routing.RoutingNode; | ||
import org.elasticsearch.cluster.routing.ShardRouting; | ||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; | ||
import org.elasticsearch.cluster.routing.allocation.WriteLoadConstraintSettings; | ||
import org.elasticsearch.threadpool.ThreadPool; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.TreeSet; | ||
|
||
/** | ||
* Non-preferred shard iterator factory that returns the most desirable shards from most-hot-spotted | ||
* nodes first. | ||
* Does not return nodes for which we have no write-pool utilization, or shards for which we have no | ||
* write-load data. | ||
*/ | ||
public class DefaultNonPreferredShardIteratorFactory implements NonPreferredShardIteratorFactory { | ||
|
||
private final WriteLoadConstraintSettings writeLoadConstraintSettings; | ||
|
||
public DefaultNonPreferredShardIteratorFactory(WriteLoadConstraintSettings writeLoadConstraintSettings) { | ||
this.writeLoadConstraintSettings = writeLoadConstraintSettings; | ||
} | ||
|
||
@Override | ||
public Iterable<ShardRouting> createNonPreferredShardIterator(RoutingAllocation allocation) { | ||
if (writeLoadConstraintSettings.getWriteLoadConstraintEnabled().notFullyEnabled()) { | ||
return Collections.emptyList(); | ||
} | ||
final Set<NodeShardIterable> hotSpottedNodes = new TreeSet<>(Comparator.reverseOrder()); | ||
final var nodeUsageStatsForThreadPools = allocation.clusterInfo().getNodeUsageStatsForThreadPools(); | ||
for (RoutingNode node : allocation.routingNodes()) { | ||
var nodeUsageStats = nodeUsageStatsForThreadPools.get(node.nodeId()); | ||
if (nodeUsageStats != null) { | ||
final var writeThreadPoolStats = nodeUsageStats.threadPoolUsageStatsMap().get(ThreadPool.Names.WRITE); | ||
assert writeThreadPoolStats != null; | ||
hotSpottedNodes.add(new NodeShardIterable(allocation, node, writeThreadPoolStats.maxThreadPoolQueueLatencyMillis())); | ||
} | ||
} | ||
return () -> new LazilyExpandingIterator<>(hotSpottedNodes); | ||
} | ||
|
||
private static class NodeShardIterable implements Iterable<ShardRouting>, Comparable<NodeShardIterable> { | ||
|
||
private final RoutingAllocation allocation; | ||
private final RoutingNode routingNode; | ||
private final long maxQueueLatencyMillis; | ||
|
||
private NodeShardIterable(RoutingAllocation allocation, RoutingNode routingNode, long maxQueueLatencyMillis) { | ||
this.allocation = allocation; | ||
this.routingNode = routingNode; | ||
this.maxQueueLatencyMillis = maxQueueLatencyMillis; | ||
} | ||
|
||
@Override | ||
public Iterator<ShardRouting> iterator() { | ||
return createShardIterator(); | ||
} | ||
|
||
@Override | ||
public int compareTo(NodeShardIterable o) { | ||
return Long.compare(maxQueueLatencyMillis, o.maxQueueLatencyMillis); | ||
} | ||
|
||
private Iterator<ShardRouting> createShardIterator() { | ||
final var shardWriteLoads = allocation.clusterInfo().getShardWriteLoads(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks like you’re creating a list of shards across all nodes. I wonder if instead, we could first collect a list of nodes that are hot spotting, then create separate lists of shards (with their write loads, skip any shards with 0 load) for each hot spotting node from the allocation.clusterInfo().getShardWriteLoads(), and finally sort and iterate each shard list in the order we prefer, checking whether we can move each shard until we find one that’s movable for each node. Still need an iterator to sort and manage a list of shards, but it might be simpler just iterating at that level? Then the nodes don't need iterators. We only want to move one shard per node. Not obvious to me how to easily achieve that when iterating all shards at once. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The approach here is it's an iterator that returns the shards we'd like to move next, in order of preference. Once we move a shard we ask again for this list. We have to do this because every time we move a shard it can change the list of shards we want to move (e.g. if a shard movement resolves a hot-spot, the shards from that node might appear further down the list in the subsequent iterator, and a lesser-hot-spotted node might appear at the front of it instead). I tried to not do any filtering here, because it's supposed to be the prioritisation logic, where the deciders themselves decide whether we If we go through one of these iterators and don't find any shard we want to move, we break out of the loop and continue to balancing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
But moving a single shard resolves the hot spot. Even if we move one shard off of a NodeA, the priority order for further shards to move away from NodeA shouldn't be dynamic 🤔
IIUC, you're trying to fairly spread node hot-spot resolution? Like pick a shard for NodeA, then pick a shard for NodeB, before coming back to NodeA. I don't think that matters for the allocator, which comes up with the final allocation, not the plan for which shards to move first. NodeA is hot-spotting, and we can focus on NodeA's shards to resolve the hot spot, before moving on the NodeB's shards. We wouldn't be assigning any of NodeA or NodeB's shards to NodeA or NodeB because they are hot / not-preferred, so there's no interaction there, and no need for evenness / fairness in selection order. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
No, as discussed on zoom the iterator represents our preference for the next move. e.g. if there are three nodes
where So we'll iterate through that list finding the first of those shards that can move somewhere, then execute the move, then we'll ask for that list again in the next iteration. Say we moved a shard from
because N is the most likely to be hot-spotted, so it goes to the front of the list Then we move a shard off of Then the iterator would look something like (although M, N, O could be in any order because they're all equal):
Which we'd iterate through and find no shard with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This ties in with my prior comment about actually calling |
||
final List<ShardRouting> sortedRoutings = new ArrayList<>(); | ||
double totalWriteLoad = 0; | ||
for (ShardRouting shard : routingNode) { | ||
Double shardWriteLoad = shardWriteLoads.get(shard.shardId()); | ||
if (shardWriteLoad != null) { | ||
sortedRoutings.add(shard); | ||
totalWriteLoad += shardWriteLoad; | ||
} | ||
} | ||
// TODO: Work out what this order should be | ||
// Sort by distance-from-mean-write-load | ||
double meanWriteLoad = totalWriteLoad / sortedRoutings.size(); | ||
sortedRoutings.sort(Comparator.comparing(sr -> Math.abs(shardWriteLoads.get(sr.shardId()) - meanWriteLoad))); | ||
return sortedRoutings.iterator(); | ||
} | ||
|
||
} | ||
|
||
static class LazilyExpandingIterator<T> implements Iterator<T> { | ||
|
||
private final Iterator<? extends Iterable<T>> allIterables; | ||
private Iterator<T> currentIterator; | ||
|
||
LazilyExpandingIterator(Iterable<? extends Iterable<T>> allIterables) { | ||
this.allIterables = allIterables.iterator(); | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
while (currentIterator == null || currentIterator.hasNext() == false) { | ||
if (allIterables.hasNext() == false) { | ||
return false; | ||
} else { | ||
currentIterator = allIterables.next().iterator(); | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
@Override | ||
public T next() { | ||
while (currentIterator == null || currentIterator.hasNext() == false) { | ||
currentIterator = allIterables.next().iterator(); | ||
} | ||
return currentIterator.next(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than passing the factory implementation through the BalancedShardsAllocator and Balancer constructors, could we directly add the logic to the Balancer in the first place? Avoid the factory. The other objects passed through the constructors are usually shared with other components, whereas the new logic only runs in the Balancer.
The moveNonPreferred could be gated by the WRITE_LOAD_DECIDER_ENABLED_SETTING. An alternative to the NOOP implementation. I don’t think tests would even be able to exercise moveNonPreferred without some hot-spot mocking to get to a 5 second queue latency, even if the new logic were enabled by default.
Though perhaps there was some other reason for the NOOP / adding it here that I'm missing. Factories seem to come into play often for stateful vs stateless impls, but we don't have an alternative real implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea here is just to put a boundary on the responsibilities of the two classes, the
BalancedShardsAllocator
doesn't care about the iteration order of the shards - as long as the iterator contains all the shards this logic will work.Similarly to how the
BalancedShardsAllocator
doesn't care what the individual deciders do, it just knows aboutYES/NO/THROTTLE/NOT_PREFERRED
.In my opinion the interface delineates responsibilities, and allows the reader to not concern themselves with the implementation details of the iteration order when grok-ing the
BalancedShardsAllocator
. It also frees us up to bake in all kinds of knowledge about the configured deciders into the our implementation without that knowledge leaking into theBalancedShardsAllocator
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default implementation could equally be