Skip to content

Commit 9db54d5

Browse files
committed
Refactor based on review comments
1 parent 0253b78 commit 9db54d5

File tree

1 file changed

+23
-33
lines changed

1 file changed

+23
-33
lines changed

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 23 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@
5353
import java.util.Collections;
5454
import java.util.Map;
5555
import java.util.Objects;
56-
import java.util.Set;
5756
import java.util.concurrent.Executor;
58-
import java.util.stream.Collectors;
5957

6058
public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor<DownsampleShardTaskParams> {
6159
private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class);
@@ -147,44 +145,36 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
147145
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
148146
}
149147

150-
final Set<String> eligibleNodes = getEligibleNodes(indexShardRouting);
151-
if (eligibleNodes.isEmpty()) {
152-
return NO_NODE_FOUND;
153-
}
154-
155-
return candidateNodes.stream()
156-
.filter(candidateNode -> eligibleNodes.contains(candidateNode.getId()))
148+
// We find the nodes that hold the eligible shards.
149+
// If the current node of such a shard is a candidate node, then we assign the task there.
150+
// This code is inefficient, but we are relying on the laziness of the intermediate operations
151+
// and the assumption that the first shard we examine has high chances of being assigned to a candidate node.
152+
return indexShardRouting.activeShards()
153+
.stream()
154+
.filter(this::isEligible)
155+
.map(ShardRouting::currentNodeId)
156+
.filter(nodeId -> isCandidateNode(candidateNodes, nodeId))
157157
.findAny()
158-
.map(
159-
node -> new PersistentTasksCustomMetadata.Assignment(
160-
node.getId(),
161-
"downsampling using node holding shard [" + shardId + "]"
162-
)
163-
)
158+
.map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]"))
164159
.orElse(NO_NODE_FOUND);
165160
}
166161

167162
/**
168-
* An eligible node to run the downsampling task for a shard is a node that holds
169-
* a searchable version of this shard.
170-
* In stateless deployment we choose only nodes that hold search shards.
171-
* Otherwise, we choose the node that holds the primary shard.
172-
* Visible for testing.
173-
* @param indexShardRouting the routing of the shard to be downsampled
174-
* @return the set of candidate nodes downsampling can run on.
163+
* Only shards that can be searched can be used as the source of a downsampling task.
164+
* In stateless deployment, this means that shards that CANNOT be promoted to primary can be used.
165+
* For simplicity, in non-stateless deployments we use the primary shard.
175166
*/
176-
Set<String> getEligibleNodes(IndexShardRoutingTable indexShardRouting) {
177-
if (isStateless) {
178-
return indexShardRouting.assignedShards()
179-
.stream()
180-
.filter(shardRouting -> shardRouting.primary() == false && shardRouting.started())
181-
.map(ShardRouting::currentNodeId)
182-
.collect(Collectors.toSet());
183-
}
184-
if (indexShardRouting.primaryShard().started()) {
185-
return Set.of(indexShardRouting.primaryShard().currentNodeId());
167+
private boolean isEligible(ShardRouting shardRouting) {
168+
return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary());
169+
}
170+
171+
private boolean isCandidateNode(Collection<DiscoveryNode> candidateNodes, String nodeId) {
172+
for (DiscoveryNode candidateNode : candidateNodes) {
173+
if (candidateNode.getId().equals(nodeId)) {
174+
return true;
175+
}
186176
}
187-
return Set.of();
177+
return false;
188178
}
189179

190180
@Override

0 commit comments

Comments
 (0)