Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/136066.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 136066
summary: Simulate shards moved by explicit commands
area: Allocation
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ public void allocate(RoutingAllocation allocation) {
assert allocation.ignoreDisable() == false;
assert allocation.isSimulating() == false || allocation.routingNodes().hasInactiveShards() == false
: "expect no initializing shard, but got " + allocation.routingNodes();
// TODO: ES-12943 cannot assert the following because shards moved by commands are not simulated promptly in DesiredBalanceComputer
// assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
// : "expect no relocating shard, but got " + allocation.routingNodes();
assert allocation.isSimulating() == false || allocation.routingNodes().getRelocatingShardCount() == 0
: "expect no relocating shard, but got " + allocation.routingNodes();

if (allocation.routingNodes().size() == 0) {
failAllocationOfNewPrimaries(allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,25 @@ public DesiredBalance compute(
while ((commands = pendingDesiredBalanceMoves.poll()) != null) {
for (MoveAllocationCommand command : commands) {
try {
command.execute(routingAllocation, false);
final var rerouteExplanation = command.execute(routingAllocation, false);
assert rerouteExplanation.decisions().type() != Decision.Type.NO : "should have thrown for NO decision";
if (rerouteExplanation.decisions().type() != Decision.Type.NO) {
final ShardRouting[] initializingShards = routingNodes.node(
routingAllocation.nodes().resolveNode(command.toNode()).getId()
).initializing();
assert initializingShards.length == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we assert no initializing shards after the while loop in general? That helps understand this assertion without reading the prior code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is asserted inside BalancedShardAllocator to ensure each simulation call starts without any initialising shad.

: "expect exactly one relocating shard, but got: " + List.of(initializingShards);
final var initializingShard = initializingShards[0];
assert routingAllocation.nodes()
.resolveNode(command.fromNode())
.getId()
.equals(initializingShard.relocatingNodeId())
: initializingShard
+ " has unexpected relocation source node, expect node "
+ routingAllocation.nodes().resolveNode(command.fromNode());
clusterInfoSimulator.simulateShardStarted(initializingShard);
routingNodes.startShard(initializingShard, changes, 0L);
}
} catch (RuntimeException e) {
logger.debug(
() -> "move shard ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.RoutingChangesObserver;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -44,7 +45,9 @@
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.ShardAllocationDecision;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
Expand All @@ -54,6 +57,7 @@
import org.elasticsearch.common.time.TimeProviderUtils;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -99,6 +103,7 @@
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.hasEntry;
Expand Down Expand Up @@ -566,7 +571,24 @@ public void testNoDataNodes() {
}

public void testAppliesMoveCommands() {
var desiredBalanceComputer = createDesiredBalanceComputer();
var desiredBalanceComputer = createDesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
// This runs after the move commands have been applied, we assert that the relocating shards caused by the move
// commands are all started by the simulation.
assertThat(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps add a comment that this runs right after the moves are applied.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, perhaps this is actually better as an assertion? Which I sort of proposed above. Happy to keep it though for the test to have verification, but we'll probably hit the assertion first.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment in fcc6449

In BalancedShardsAllocator, we do assert that the initial routingAlloction has no initializing shard. Is it similar to what you have in mind? It's not necessary that allocate runs "right" after the move commands, e.g. we can have additional code after the move commands. The important thing is that all moving shards are started once computer is ready to call allocate.

"unexpected relocating shards: " + allocation.routingNodes(),
allocation.routingNodes().getRelocatingShardCount(),
equalTo(0)
);
assertThat(allocation.routingNodes().node("node-2").started(), arrayWithSize(2));
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
});
var clusterState = createInitialClusterState(3);
var index = clusterState.metadata().getProject().index(TEST_INDEX).getIndex();

Expand All @@ -578,23 +600,99 @@ public void testAppliesMoveCommands() {
}
clusterState = rebuildRoutingTable(clusterState, routingNodes);

final var dataNodeIds = clusterState.nodes().getDataNodes().keySet();
for (var nodeId : List.of("node-0", "node-1")) {
final var desiredBalanceInput = DesiredBalanceInput.create(
randomInt(),
new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// Move command works every decision except NO
return randomFrom(Decision.YES, Decision.THROTTLE, Decision.NOT_PREFERRED);
}
})), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L)
);
var desiredBalance = desiredBalanceComputer.compute(
DesiredBalance.BECOME_MASTER_INITIAL,
desiredBalanceInput,
queue(
new MoveAllocationCommand(index.getName(), 0, nodeId, "node-2"),
new MoveAllocationCommand(index.getName(), 1, nodeId, "node-2")
),
input -> true
);

final Set<String> expectedNodeIds = Sets.difference(dataNodeIds, Set.of(nodeId));
assertDesiredAssignments(
desiredBalance,
Map.of(
new ShardId(index, 0),
new ShardAssignment(expectedNodeIds, 2, 0, 0),
new ShardId(index, 1),
new ShardAssignment(expectedNodeIds, 2, 0, 0)
)
);
}
}

public void testCannotApplyMoveCommand() {
var desiredBalanceComputer = createDesiredBalanceComputer(new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
// This runs after the move commands have been executed and failed, we assert that no movement should be seen
// in the routing nodes.
assertThat(
"unexpected relocating shards: " + allocation.routingNodes(),
allocation.routingNodes().getRelocatingShardCount(),
equalTo(0)
);
assertThat(allocation.routingNodes().node("node-2").isEmpty(), equalTo(true));
}

@Override
public ShardAllocationDecision decideShardAllocation(ShardRouting shard, RoutingAllocation allocation) {
throw new AssertionError("only used for allocation explain");
}
});
var clusterState = createInitialClusterState(3);
var index = clusterState.metadata().getProject().index(TEST_INDEX).getIndex();

var changes = new RoutingChangesObserver.DelegatingRoutingChangesObserver();
var routingNodes = clusterState.mutableRoutingNodes();
for (var iterator = routingNodes.unassigned().iterator(); iterator.hasNext();) {
var shardRouting = iterator.next();
routingNodes.startShard(iterator.initialize(shardRouting.primary() ? "node-0" : "node-1", null, 0L, changes), changes, 0L);
}
clusterState = rebuildRoutingTable(clusterState, routingNodes);

final var desiredBalanceInput = DesiredBalanceInput.create(
randomInt(),
new RoutingAllocation(new AllocationDeciders(List.of(new AllocationDecider() {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
// Always return NO so that AllocationCommands will silently fail.
return Decision.NO;
}
})), clusterState, ClusterInfo.EMPTY, SnapshotShardSizeInfo.EMPTY, 0L)
);
var desiredBalance = desiredBalanceComputer.compute(
DesiredBalance.BECOME_MASTER_INITIAL,
createInput(clusterState),
desiredBalanceInput,
queue(
new MoveAllocationCommand(index.getName(), 0, "node-1", "node-2"),
new MoveAllocationCommand(index.getName(), 1, "node-1", "node-2")
new MoveAllocationCommand(index.getName(), 0, randomFrom("node-0", "node-1"), "node-2"),
new MoveAllocationCommand(index.getName(), 1, randomFrom("node-0", "node-1"), "node-2")
),
input -> true
);

final Set<String> expectedNodeIds = Set.of("node-0", "node-1");
assertDesiredAssignments(
desiredBalance,
Map.of(
new ShardId(index, 0),
new ShardAssignment(Set.of("node-0", "node-2"), 2, 0, 0),
new ShardAssignment(expectedNodeIds, 2, 0, 0),
new ShardId(index, 1),
new ShardAssignment(Set.of("node-0", "node-2"), 2, 0, 0)
new ShardAssignment(expectedNodeIds, 2, 0, 0)
)
);
}
Expand Down