Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136066.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136066
summary: Simulate shards moved by explicit commands
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ public void allocate(RoutingAllocation allocation) {
assert allocation.ignoreDisable() == false;
assert allocation.isSimulating() == false || allocation.routingNodes().hasInactiveShards() == false
: "expect no initializing shard, but got " + allocation.routingNodes();
// TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
// assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
// : "expect no relocating shard, but got " + allocation.routingNodes();
assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
: "expect no relocating shard, but got " + allocation.routingNodes();

if (allocation.routingNodes().size() == 0) {
failAllocationOfNewPrimaries(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.index.shard.ShardId;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
Expand Down Expand Up @@ -291,7 +292,23 @@ public DesiredBalance compute(
while ((commands = pendingDesiredBalanceMoves.poll()) != null) {
for (MoveAllocationCommand command : commands) {
try {
command.execute(routingAllocation, false);
final var rerouteExplanation = command.execute(routingAllocation, false);
assert rerouteExplanation.decisions().type() != Decision.Type.NO;
if (rerouteExplanation.decisions().type() != Decision.Type.NO) {
final ShardRouting[] initializingShards = routingNodes.node(
routingAllocation.nodes().resolveNode(command.toNode()).getId()
).initializing();
assert initializingShards.length == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert no initializing shards after the while loop in general? That helps understand this assertion without reading the prior code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asserted inside BalancedShardAllocator to ensure each simulation call starts without any initialising shad.

&& routingAllocation.nodes()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let us split into two assertions, one for only 1 initializing shard and one for the right node holding the initializing shard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep see b9dc835

.resolveNode(command.fromNode())
.getId()
.equals(initializingShards[0].relocatingNodeId())
: "expect one relocating shard, but got : " + List.of(initializingShards);
Arrays.stream(initializingShards).forEach(shard -> {
clusterInfoSimulator.simulateShardStarted(shard);
routingNodes.startShard(shard, changes, 0L);
});
}
} catch (RuntimeException e) {
logger.debug(
() -> "move shard ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -44,7 +45,9 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
Expand All @@ -54,6 +57,7 @@
import org.elasticsearch.common.time.TimeProviderUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -99,6 +103,7 @@
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasEntry;
Expand Down Expand Up @@ -566,7 +571,22 @@ public void testNoDataNodes() {
}

public void testAppliesMoveCommands() {
var desiredBalanceComputer = createDesiredBalanceComputer();
var desiredBalanceComputer = createDesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment that this runs right after the moves are applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps this is actually better as an assertion? Which I sort of proposed above. Happy to keep it though for the test to have verification, but we'll probably hit the assertion first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in fcc6449

In BalancedShardsAllocator, we do assert that the initial routingAlloction has no initializing shard. Is it similar to what you have in mind? It's not necessary that allocate runs "right" after the move commands, e.g. we can have additional code after the move commands. The important thing is that all moving shards are started once computer is ready to call allocate.

"unexpected relocating shards: " + allocation.routingNodes(),
allocation.routingNodes().getRelocatingShardCount(),
equalTo(0)
);
assertThat(allocation.routingNodes().node("node-2").started(), arrayWithSize(2));
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
});
var clusterState = createInitialClusterState(3);
var index = clusterState.metadata().getProject().index(TEST_INDEX).getIndex();

Expand All @@ -578,23 +598,99 @@ public void testAppliesMoveCommands() {
}
clusterState = rebuildRoutingTable(clusterState, routingNodes);

final var dataNodeIds = clusterState.nodes().getDataNodes().keySet();
for (var nodeId : List.of("node-0", "node-1")) {
final var desiredBalanceInput = new DesiredBalanceInput(
randomInt(),
new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// Move command works every decision except NO
return randomFrom(Decision.YES, Decision.THROTTLE, Decision.THROTTLE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean not-preferred rather than 2xthrottle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have been not-preferred, updated in 5fbd0f9

}
})), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L),
List.of()
);
var desiredBalance = desiredBalanceComputer.compute(
DesiredBalance.BECOME_MASTER_INITIAL,
desiredBalanceInput,
queue(
new MoveAllocationCommand(index.getName(), 0, nodeId, "node-2"),
new MoveAllocationCommand(index.getName(), 1, nodeId, "node-2")
),
input -> true
);

final Set<String> expectedNodeIds = Sets.difference(dataNodeIds, Set.of(nodeId));
assertDesiredAssignments(
desiredBalance,
Map.of(
new ShardId(index, 0),
new ShardAssignment(expectedNodeIds, 2, 0, 0),
new ShardId(index, 1),
new ShardAssignment(expectedNodeIds, 2, 0, 0)
)
);
}
}

public void testCannotApplyMoveCommand() {
var desiredBalanceComputer = createDesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
assertThat(
"unexpected relocating shards: " + allocation.routingNodes(),
allocation.routingNodes().getRelocatingShardCount(),
equalTo(0)
);
assertThat(allocation.routingNodes().node("node-2").isEmpty(), equalTo(true));
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
});
var clusterState = createInitialClusterState(3);
var index = clusterState.metadata().getProject().index(TEST_INDEX).getIndex();

var changes = new RoutingChangesObserver.DelegatingRoutingChangesObserver();
var routingNodes = clusterState.mutableRoutingNodes();
for (var iterator = routingNodes.unassigned().iterator(); iterator.hasNext();) {
var shardRouting = iterator.next();
routingNodes.startShard(iterator.initialize(shardRouting.primary() ? "node-0" : "node-1", null, 0L, changes), changes, 0L);
}
clusterState = rebuildRoutingTable(clusterState, routingNodes);

final var dataNodeIds = clusterState.nodes().getDataNodes().keySet();
final var desiredBalanceInput = new DesiredBalanceInput(
randomInt(),
new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return Decision.NO;
}
})), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L),
List.of()
);
var desiredBalance = desiredBalanceComputer.compute(
DesiredBalance.BECOME_MASTER_INITIAL,
createInput(clusterState),
desiredBalanceInput,
queue(
new MoveAllocationCommand(index.getName(), 0, "node-1", "node-2"),
new MoveAllocationCommand(index.getName(), 1, "node-1", "node-2")
new MoveAllocationCommand(index.getName(), 0, randomFrom("node-0", "node-1"), "node-2"),
new MoveAllocationCommand(index.getName(), 1, randomFrom("node-0", "node-1"), "node-2")
),
input -> true
);

final Set<String> expectedNodeIds = Set.of("node-0", "node-1");
assertDesiredAssignments(
desiredBalance,
Map.of(
new ShardId(index, 0),
new ShardAssignment(Set.of("node-0", "node-2"), 2, 0, 0),
new ShardAssignment(expectedNodeIds, 2, 0, 0),
new ShardId(index, 1),
new ShardAssignment(Set.of("node-0", "node-2"), 2, 0, 0)
new ShardAssignment(expectedNodeIds, 2, 0, 0)
)
);
}
Expand Down