Skip to content

Commit 0253b78

Browse files
committed
Ensure downsample tasks in stateless work with replicas
1 parent 77b459c commit 0253b78

File tree

3 files changed

+100
-5
lines changed

3 files changed

+100
-5
lines changed

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/Downsample.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(
9898
new DownsampleShardPersistentTaskExecutor(
9999
client,
100100
DownsampleShardTask.TASK_NAME,
101+
clusterService.getSettings(),
101102
threadPool.executor(DOWNSAMPLE_TASK_THREAD_POOL_NAME)
102103
)
103104
);

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2727
import org.elasticsearch.cluster.routing.ShardRouting;
2828
import org.elasticsearch.common.io.stream.StreamOutput;
29+
import org.elasticsearch.common.settings.Settings;
2930
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3031
import org.elasticsearch.common.util.concurrent.EsExecutors;
3132
import org.elasticsearch.index.IndexNotFoundException;
@@ -52,15 +53,19 @@
5253
import java.util.Collections;
5354
import java.util.Map;
5455
import java.util.Objects;
56+
import java.util.Set;
5557
import java.util.concurrent.Executor;
58+
import java.util.stream.Collectors;
5659

5760
public class DownsampleShardPersistentTaskExecutor extends PersistentTasksExecutor<DownsampleShardTaskParams> {
5861
private static final Logger LOGGER = LogManager.getLogger(DownsampleShardPersistentTaskExecutor.class);
5962
private final Client client;
63+
private final boolean isStateless;
6064

61-
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, final Executor executor) {
65+
public DownsampleShardPersistentTaskExecutor(final Client client, final String taskName, Settings settings, final Executor executor) {
6266
super(taskName, executor);
6367
this.client = Objects.requireNonNull(client);
68+
this.isStateless = DiscoveryNode.isStateless(settings);
6469
}
6570

6671
@Override
@@ -142,13 +147,13 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
142147
return new PersistentTasksCustomMetadata.Assignment(node.getId(), "a node to fail and stop this persistent task");
143148
}
144149

145-
final ShardRouting shardRouting = indexShardRouting.primaryShard();
146-
if (shardRouting.started() == false) {
150+
final Set<String> eligibleNodes = getEligibleNodes(indexShardRouting);
151+
if (eligibleNodes.isEmpty()) {
147152
return NO_NODE_FOUND;
148153
}
149154

150155
return candidateNodes.stream()
151-
.filter(candidateNode -> candidateNode.getId().equals(shardRouting.currentNodeId()))
156+
.filter(candidateNode -> eligibleNodes.contains(candidateNode.getId()))
152157
.findAny()
153158
.map(
154159
node -> new PersistentTasksCustomMetadata.Assignment(
@@ -159,6 +164,29 @@ public PersistentTasksCustomMetadata.Assignment getAssignment(
159164
.orElse(NO_NODE_FOUND);
160165
}
161166

167+
/**
168+
* An eligible node to run the downsampling task for a shard is a node that holds
169+
* a searchable version of this shard.
170+
* In stateless deployment we choose only nodes that hold search shards.
171+
* Otherwise, we choose the node that holds the primary shard.
172+
* Visible for testing.
173+
* @param indexShardRouting the routing of the shard to be downsampled
174+
* @return the set of candidate nodes downsampling can run on.
175+
*/
176+
Set<String> getEligibleNodes(IndexShardRoutingTable indexShardRouting) {
177+
if (isStateless) {
178+
return indexShardRouting.assignedShards()
179+
.stream()
180+
.filter(shardRouting -> shardRouting.primary() == false && shardRouting.started())
181+
.map(ShardRouting::currentNodeId)
182+
.collect(Collectors.toSet());
183+
}
184+
if (indexShardRouting.primaryShard().started()) {
185+
return Set.of(indexShardRouting.primaryShard().currentNodeId());
186+
}
187+
return Set.of();
188+
}
189+
162190
@Override
163191
public Executor getExecutor() {
164192
// The delegate action forks to the a downsample thread:

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.routing.RoutingTable;
2121
import org.elasticsearch.common.Strings;
2222
import org.elasticsearch.common.UUIDs;
23+
import org.elasticsearch.common.settings.Settings;
2324
import org.elasticsearch.core.Tuple;
2425
import org.elasticsearch.index.Index;
2526
import org.elasticsearch.index.shard.ShardId;
@@ -35,9 +36,11 @@
3536
import java.util.Set;
3637
import java.util.concurrent.Executor;
3738

39+
import static org.elasticsearch.cluster.node.DiscoveryNode.STATELESS_ENABLED_SETTING_NAME;
3840
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
3941
import static org.elasticsearch.cluster.routing.TestShardRouting.shardRoutingBuilder;
4042
import static org.hamcrest.Matchers.equalTo;
43+
import static org.hamcrest.Matchers.nullValue;
4144
import static org.mockito.Mockito.mock;
4245

4346
public class DownsampleShardPersistentTaskExecutorTests extends ESTestCase {
@@ -57,7 +60,12 @@ public void setup() {
5760
"metrics-app1",
5861
List.of(new Tuple<>(start, end))
5962
);
60-
executor = new DownsampleShardPersistentTaskExecutor(mock(Client.class), DownsampleShardTask.TASK_NAME, mock(Executor.class));
63+
executor = new DownsampleShardPersistentTaskExecutor(
64+
mock(Client.class),
65+
DownsampleShardTask.TASK_NAME,
66+
Settings.EMPTY,
67+
mock(Executor.class)
68+
);
6169
}
6270

6371
public void testGetAssignment() {
@@ -124,7 +132,65 @@ public void testGetAssignmentMissingIndex() {
124132
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
125133
}
126134

135+
public void testGetStatelessAssignment() {
136+
executor = new DownsampleShardPersistentTaskExecutor(
137+
mock(Client.class),
138+
DownsampleShardTask.TASK_NAME,
139+
Settings.builder().put(STATELESS_ENABLED_SETTING_NAME, "true").build(),
140+
mock(Executor.class)
141+
);
142+
var backingIndex = initialClusterState.metadata().getProject(projectId).dataStreams().get("metrics-app1").getWriteIndex();
143+
var searchNode = newNode(Set.of(DiscoveryNodeRole.SEARCH_ROLE));
144+
var indexNode = newNode(Set.of(DiscoveryNodeRole.INDEX_ROLE));
145+
var shardId = new ShardId(backingIndex, 0);
146+
var clusterState = ClusterState.builder(initialClusterState)
147+
.nodes(new DiscoveryNodes.Builder().add(indexNode).add(searchNode).build())
148+
.putRoutingTable(
149+
projectId,
150+
RoutingTable.builder()
151+
.add(
152+
IndexRoutingTable.builder(backingIndex)
153+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
154+
)
155+
.build()
156+
)
157+
.build();
158+
159+
var params = new DownsampleShardTaskParams(
160+
new DownsampleConfig(new DateHistogramInterval("1h")),
161+
shardId.getIndexName(),
162+
1,
163+
1,
164+
shardId,
165+
Strings.EMPTY_ARRAY,
166+
Strings.EMPTY_ARRAY,
167+
Strings.EMPTY_ARRAY
168+
);
169+
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
170+
assertThat(result.getExecutorNode(), nullValue());
171+
172+
// Assign a copy of the shard to a search node
173+
clusterState = ClusterState.builder(clusterState)
174+
.putRoutingTable(
175+
projectId,
176+
RoutingTable.builder()
177+
.add(
178+
IndexRoutingTable.builder(backingIndex)
179+
.addShard(shardRoutingBuilder(shardId, indexNode.getId(), true, STARTED).withRecoverySource(null).build())
180+
.addShard(shardRoutingBuilder(shardId, searchNode.getId(), false, STARTED).withRecoverySource(null).build())
181+
)
182+
.build()
183+
)
184+
.build();
185+
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
186+
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
187+
}
188+
127189
private static DiscoveryNode newNode() {
190+
return newNode(DiscoveryNodeRole.roles());
191+
}
192+
193+
private static DiscoveryNode newNode(Set<DiscoveryNodeRole> nodes) {
128194
return DiscoveryNodeUtils.create(
129195
"node_" + UUIDs.randomBase64UUID(random()),
130196
buildNewFakeTransportAddress(),

0 commit comments

Comments
 (0)