Skip to content

Commit 33e7db0

Browse files
authored
[Downsampling] Ensure downsample tasks in stateless work with replicas (#130160)
Downsample tasks run on nodes that hold a searchable version of the shard to be downsampled. So far, we chose the primary shard. This is sufficient in general but not in a stateless distribution when only non-primary shards are searchable. In this PR we add functionality to distinguish a stateless deployment and choose only search shards.
1 parent d3f042e commit 33e7db0

File tree

4 files changed

+105
-17
lines changed

4 files changed

+105
-17
lines changed

x-pack/plugin/downsample/qa/rest/src/yamlRestTest/resources/rest-api-spec/test/downsample/10_basic.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1555,7 +1555,6 @@ setup:
15551555
body:
15561556
settings:
15571557
number_of_shards: 1
1558-
number_of_replicas: 0
15591558
index:
15601559
mode: time_series
15611560
routing_path: [ k8s.pod.name ]
@@ -1639,7 +1638,6 @@ setup:
16391638
body:
16401639
settings:
16411640
number_of_shards: 1
1642-
number_of_replicas: 0
16431641
index:
16441642
mode: time_series
16451643
routing_path: [ k8s.pod.name, k8s.pod.empty ]

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
9898
new DownsampleShardPersistentTaskExecutor(
9999
client,
100100
DownsampleShardTask.TASK_NAME,
101+
clusterService.getSettings(),
101102
threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME)
102103
)
103104
);

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2727
import org.elasticsearch.cluster.routing.ShardRouting;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3031
import org.elasticsearch.common.util.concurrent.EsExecutors;
3132
import org.elasticsearch.index.IndexNotFoundException;
@@ -57,10 +58,12 @@
5758
public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor<DownsampleShardTaskParams> {
5859
private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class);
5960
private final Client client;
61+
private final boolean isStateless;
6062

61-
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) {
63+
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) {
6264
super(taskName, executor);
6365
this.client = Objects.requireNonNull(client);
66+
this.isStateless = DiscoveryNode.isStateless(settings);
6467
}
6568

6669
@Override
@@ -142,23 +145,38 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
142145
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
143146
}
144147

145-
final ShardRouting shardRouting = indexShardRouting.primaryShard();
146-
if (shardRouting.started() == false) {
147-
return NO_NODE_FOUND;
148-
}
149-
150-
return candidateNodes.stream()
151-
.filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId()))
148+
// We find the nodes that hold the eligible shards.
149+
// If the current node of such a shard is a candidate node, then we assign the task there.
150+
// This code is inefficient, but we are relying on the laziness of the intermediate operations
151+
// and the assumption that the first shard we examine has high chances of being assigned to a candidate node.
152+
return indexShardRouting.activeShards()
153+
.stream()
154+
.filter(this::isEligible)
155+
.map(ShardRouting::currentNodeId)
156+
.filter(nodeId -> isCandidateNode(candidateNodes, nodeId))
152157
.findAny()
153-
.map(
154-
node -> new PersistentTasksCustomMetadata.Assignment(
155-
node.getId(),
156-
"downsampling using node holding shard [" + shardId + "]"
157-
)
158-
)
158+
.map(nodeId -> new PersistentTasksCustomMetadata.Assignment(nodeId, "downsampling using node holding shard [" + shardId + "]"))
159159
.orElse(NO_NODE_FOUND);
160160
}
161161

162+
/**
163+
* Only shards that can be searched can be used as the source of a downsampling task.
164+
* In stateless deployment, this means that shards that CANNOT be promoted to primary can be used.
165+
* For simplicity, in non-stateless deployments we use the primary shard.
166+
*/
167+
private boolean isEligible(ShardRouting shardRouting) {
168+
return shardRouting.started() && (isStateless ? shardRouting.isPromotableToPrimary() == false : shardRouting.primary());
169+
}
170+
171+
private boolean isCandidateNode(Collection<DiscoveryNode> candidateNodes, String nodeId) {
172+
for (DiscoveryNode candidateNode : candidateNodes) {
173+
if (candidateNode.getId().equals(nodeId)) {
174+
return true;
175+
}
176+
}
177+
return false;
178+
}
179+
162180
@Override
163181
public Executor getExecutor() {
164182
// The delegate action forks to the a downsample thread:

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
import org.elasticsearch.cluster.node.DiscoveryNodes;
1919
import org.elasticsearch.cluster.routing.IndexRoutingTable;
2020
import org.elasticsearch.cluster.routing.RoutingTable;
21+
import org.elasticsearch.cluster.routing.ShardRouting;
2122
import org.elasticsearch.common.Strings;
2223
import org.elasticsearch.common.UUIDs;
24+
import org.elasticsearch.common.settings.Settings;
2325
import org.elasticsearch.core.Tuple;
2426
import org.elasticsearch.index.Index;
2527
import org.elasticsearch.index.shard.ShardId;
@@ -35,9 +37,11 @@
3537
import java.util.Set;
3638
import java.util.concurrent.Executor;
3739

40+
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
3841
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
3942
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
4043
import static org.hamcrest.Matchers.equalTo;
44+
import static org.hamcrest.Matchers.nullValue;
4145
import static org.mockito.Mockito.mock;
4246

4347
public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
@@ -57,7 +61,12 @@ public void setup() {
5761
"metrics-app1",
5862
List.of(new Tuple<>(start, end))
5963
);
60-
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
64+
executor = new DownsampleShardPersistentTaskExecutor(
65+
mock(Client.class),
66+
DownsampleShardTask.TASK_NAME,
67+
Settings.EMPTY,
68+
mock(Executor.class)
69+
);
6170
}
6271

6372
public void testGetAssignment() {
@@ -124,7 +133,69 @@ public void testGetAssignmentMissingIndex() {
124133
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
125134
}
126135

136+
public void testGetStatelessAssignment() {
137+
executor = new DownsampleShardPersistentTaskExecutor(
138+
mock(Client.class),
139+
DownsampleShardTask.TASK_NAME,
140+
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
141+
mock(Executor.class)
142+
);
143+
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
144+
var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE));
145+
var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE));
146+
var shardId = new ShardId(backingIndex, 0);
147+
var clusterState = ClusterState.builder(initialClusterState)
148+
.nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build())
149+
.putRoutingTable(
150+
projectId,
151+
RoutingTable.builder()
152+
.add(
153+
IndexRoutingTable.builder(backingIndex)
154+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
155+
)
156+
.build()
157+
)
158+
.build();
159+
160+
var params = new DownsampleShardTaskParams(
161+
new DownsampleConfig(new DateHistogramInterval("1h")),
162+
shardId.getIndexName(),
163+
1,
164+
1,
165+
shardId,
166+
Strings.EMPTY_ARRAY,
167+
Strings.EMPTY_ARRAY,
168+
Strings.EMPTY_ARRAY
169+
);
170+
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
171+
assertThat(result.getExecutorNode(), nullValue());
172+
173+
// Assign a copy of the shard to a search node
174+
clusterState = ClusterState.builder(clusterState)
175+
.putRoutingTable(
176+
projectId,
177+
RoutingTable.builder()
178+
.add(
179+
IndexRoutingTable.builder(backingIndex)
180+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
181+
.addShard(
182+
shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null)
183+
.withRole(ShardRouting.Role.SEARCH_ONLY)
184+
.build()
185+
)
186+
)
187+
.build()
188+
)
189+
.build();
190+
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
191+
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
192+
}
193+
127194
private static DiscoveryNode newNode() {
195+
return newNode(DiscoveryNodeRole.roles());
196+
}
197+
198+
private static DiscoveryNode newNode(Set<DiscoveryNodeRole> nodes) {
128199
return DiscoveryNodeUtils.create(
129200
"node_" + UUIDs.randomBase64UUID(random()),
130201
buildNewFakeTransportAddress(),

0 commit comments

Comments
 (0)