Skip to content

Commit 929f65b

Browse files
authored
[Persistent Tasks] Assign based on ProjectId (elastic#130391)
Pass the ProjectId from PersistentTaskClusterService through to all PersistentTasksExecutors when creating node assignments. These PersistentTasksExecutors require the ProjectId during node assignment: - OpenJobPersistentTasksExecutor - SnapshotUpgradeTaskExecutor - StartDatafeedPersistentTasksExecutor - TransformPersistentTasksExecutor
1 parent a786c93 commit 929f65b

File tree

19 files changed

+187
-81
lines changed

19 files changed

+187
-81
lines changed

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import org.elasticsearch.cluster.ClusterChangedEvent;
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.ClusterStateListener;
19+
import org.elasticsearch.cluster.metadata.ProjectId;
1920
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2223
import org.elasticsearch.common.settings.ClusterSettings;
2324
import org.elasticsearch.common.settings.Setting;
2425
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.core.Nullable;
2527
import org.elasticsearch.core.TimeValue;
2628
import org.elasticsearch.node.NodeClosedException;
2729
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -133,10 +135,11 @@ protected HealthNode createTask(
133135
* Returns the node id from the eligible health nodes
134136
*/
135137
@Override
136-
public PersistentTasksCustomMetadata.Assignment getAssignment(
138+
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
137139
HealthNodeTaskParams params,
138140
Collection<DiscoveryNode> candidateNodes,
139-
ClusterState clusterState
141+
ClusterState clusterState,
142+
@Nullable ProjectId projectId
140143
) {
141144
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
142145
if (discoveryNode == null) {

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) {
171171
assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER)
172172
|| (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT)
173173
: "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]";
174-
taskExecutor.validate(taskParams, currentState);
174+
taskExecutor.validate(taskParams, currentState, projectId);
175175

176-
Assignment assignment = createAssignment(taskName, taskParams, currentState);
176+
Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId);
177177
logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment);
178178
return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId);
179179
}
@@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
449449
private <Params extends PersistentTaskParams> Assignment createAssignment(
450450
final String taskName,
451451
final Params taskParams,
452-
final ClusterState currentState
452+
final ClusterState currentState,
453+
@Nullable final ProjectId projectId
453454
) {
454455
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
455456

@@ -468,7 +469,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
468469
// Task assignment should not rely on node order
469470
Randomness.shuffle(candidateNodes);
470471

471-
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
472+
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId);
472473
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
473474
assert (assignment.getExecutorNode() == null
474475
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
@@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
540541
* persistent tasks changed.
541542
*/
542543
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
543-
final List<PersistentTasks> allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList();
544-
if (allTasks.isEmpty()) {
544+
var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator();
545+
if (projectIdToTasksIterator.hasNext() == false) {
545546
return false;
546547
}
547548

@@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
553554
|| event.metadataChanged()
554555
|| masterChanged) {
555556

556-
for (PersistentTasks tasks : allTasks) {
557-
for (PersistentTask<?> task : tasks.tasks()) {
557+
while (projectIdToTasksIterator.hasNext()) {
558+
var projectIdToTasks = projectIdToTasksIterator.next();
559+
for (PersistentTask<?> task : projectIdToTasks.v2().tasks()) {
558560
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
559-
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
561+
Assignment assignment = createAssignment(
562+
task.getTaskName(),
563+
task.getParams(),
564+
event.state(),
565+
projectIdToTasks.v1()
566+
);
560567
if (Objects.equals(assignment, task.getAssignment()) == false) {
561568
return true;
562569
}
@@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project
602609
// We need to check if removed nodes were running any of the tasks and reassign them
603610
for (PersistentTask<?> task : tasks.tasks()) {
604611
if (needsReassignment(task.getAssignment(), nodes)) {
605-
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
612+
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId);
606613
if (Objects.equals(assignment, task.getAssignment()) == false) {
607614
logger.trace(
608615
"reassigning {} task {} from node {} to node {}",

server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.persistent;
1111

1212
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.core.Tuple;
@@ -63,7 +64,31 @@ public Scope scope() {
6364
* <p>
6465
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
6566
*/
66-
public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
67+
public final Assignment getAssignment(
68+
Params params,
69+
Collection<DiscoveryNode> candidateNodes,
70+
ClusterState clusterState,
71+
@Nullable ProjectId projectId
72+
) {
73+
assert (scope() == Scope.PROJECT && projectId != null) || (scope() == Scope.CLUSTER && projectId == null)
74+
: "inconsistent project-id [" + projectId + "] and task scope [" + scope() + "]";
75+
return doGetAssignment(params, candidateNodes, clusterState, projectId);
76+
}
77+
78+
/**
79+
* Returns the node id where the params has to be executed,
80+
* <p>
81+
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes.
82+
* <p>
83+
* If {@link #scope()} returns CLUSTER, then {@link ProjectId} will be null.
84+
* If {@link #scope()} returns PROJECT, then {@link ProjectId} will not be null.
85+
*/
86+
protected Assignment doGetAssignment(
87+
Params params,
88+
Collection<DiscoveryNode> candidateNodes,
89+
ClusterState clusterState,
90+
@Nullable ProjectId projectId
91+
) {
6792
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
6893
if (discoveryNode == null) {
6994
return NO_NODE_FOUND;
@@ -105,7 +130,7 @@ protected DiscoveryNode selectLeastLoadedNode(
105130
* <p>
106131
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
107132
*/
108-
public void validate(Params params, ClusterState clusterState) {}
133+
public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {}
109134

110135
/**
111136
* Creates a AllocatedPersistentTask for communicating with task manager

server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.metadata.IndexMetadata;
2121
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
23+
import org.elasticsearch.cluster.metadata.ProjectId;
2324
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2425
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -1087,7 +1088,12 @@ public Scope scope() {
10871088
}
10881089

10891090
@Override
1090-
public Assignment getAssignment(P params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
1091+
protected Assignment doGetAssignment(
1092+
P params,
1093+
Collection<DiscoveryNode> candidateNodes,
1094+
ClusterState clusterState,
1095+
ProjectId projectId
1096+
) {
10911097
return fn.apply(params, candidateNodes, clusterState);
10921098
}
10931099

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.client.internal.ElasticsearchClient;
2626
import org.elasticsearch.cluster.ClusterState;
2727
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
28+
import org.elasticsearch.cluster.metadata.ProjectId;
2829
import org.elasticsearch.cluster.node.DiscoveryNode;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.Strings;
@@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition)
326327
}
327328

328329
@Override
329-
public Assignment getAssignment(TestParams params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
330+
protected Assignment doGetAssignment(
331+
TestParams params,
332+
Collection<DiscoveryNode> candidateNodes,
333+
ClusterState clusterState,
334+
ProjectId projectId
335+
) {
330336
if (nonClusterStateCondition == false) {
331337
return new Assignment(null, "non cluster state condition prevents assignment");
332338
}
333339
if (params == null || params.getExecutorNodeAttr() == null) {
334-
return super.getAssignment(params, candidateNodes, clusterState);
340+
return super.doGetAssignment(params, candidateNodes, clusterState, projectId);
335341
} else {
336342
DiscoveryNode executorNode = selectLeastLoadedNode(
337343
clusterState,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.metadata.AliasMetadata;
3333
import org.elasticsearch.cluster.metadata.IndexMetadata;
3434
import org.elasticsearch.cluster.metadata.MappingMetadata;
35+
import org.elasticsearch.cluster.metadata.ProjectId;
3536
import org.elasticsearch.cluster.node.DiscoveryNode;
3637
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3738
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.common.util.concurrent.EsExecutors;
4445
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4546
import org.elasticsearch.core.CheckedConsumer;
47+
import org.elasticsearch.core.Nullable;
4648
import org.elasticsearch.core.TimeValue;
4749
import org.elasticsearch.index.Index;
4850
import org.elasticsearch.index.IndexNotFoundException;
@@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer
118120
}
119121

120122
@Override
121-
public void validate(ShardFollowTask params, ClusterState clusterState) {
123+
public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) {
122124
final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
123125
final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard();
124126
if (primaryShard.active() == false) {
@@ -129,10 +131,11 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
129131
private static final Assignment NO_ASSIGNMENT = new Assignment(null, "no nodes found with data and remote cluster client roles");
130132

131133
@Override
132-
public Assignment getAssignment(
134+
protected Assignment doGetAssignment(
133135
final ShardFollowTask params,
134-
Collection<DiscoveryNode> candidateNodes,
135-
final ClusterState clusterState
136+
final Collection<DiscoveryNode> candidateNodes,
137+
final ClusterState clusterState,
138+
@Nullable final ProjectId projectId
136139
) {
137140
final DiscoveryNode node = selectLeastLoadedNode(
138141
clusterState,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.client.internal.Client;
1111
import org.elasticsearch.cluster.ClusterName;
1212
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
1516
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -93,7 +94,8 @@ private void runAssignmentTest(
9394
final Assignment assignment = executor.getAssignment(
9495
mock(ShardFollowTask.class),
9596
clusterStateBuilder.nodes().getAllNodes(),
96-
clusterStateBuilder.build()
97+
clusterStateBuilder.build(),
98+
ProjectId.DEFAULT
9799
);
98100
consumer.accept(theSpecial, assignment);
99101
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.elasticsearch.action.support.TransportAction;
2323
import org.elasticsearch.client.internal.Client;
2424
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.metadata.ProjectId;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.common.io.stream.StreamOutput;
2930
import org.elasticsearch.common.settings.Settings;
3031
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3132
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.core.Nullable;
3234
import org.elasticsearch.index.IndexNotFoundException;
3335
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
3436
import org.elasticsearch.index.shard.ShardId;
@@ -116,7 +118,7 @@ protected AllocatedPersistentTask createTask(
116118
}
117119

118120
@Override
119-
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
121+
public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) {
120122
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
121123
// after initial creation of the persistent task.
122124
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
@@ -126,10 +128,11 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState
126128
}
127129

128130
@Override
129-
public PersistentTasksCustomMetadata.Assignment getAssignment(
131+
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
130132
final DownsampleShardTaskParams params,
131133
final Collection<DiscoveryNode> candidateNodes,
132-
final ClusterState clusterState
134+
final ClusterState clusterState,
135+
@Nullable final ProjectId projectId
133136
) {
134137
// NOTE: downsampling works by running a task per each shard of the source index.
135138
// Here we make sure we assign the task to the actual node holding the shard identified by

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testGetAssignment() {
9696
Strings.EMPTY_ARRAY,
9797
Strings.EMPTY_ARRAY
9898
);
99-
var result = executor.getAssignment(params, Set.of(node), clusterState);
99+
var result = executor.getAssignment(params, Set.of(node), clusterState, projectId);
100100
assertThat(result.getExecutorNode(), equalTo(node.getId()));
101101
}
102102

@@ -128,7 +128,7 @@ public void testGetAssignmentMissingIndex() {
128128
Strings.EMPTY_ARRAY,
129129
Strings.EMPTY_ARRAY
130130
);
131-
var result = executor.getAssignment(params, Set.of(node), clusterState);
131+
var result = executor.getAssignment(params, Set.of(node), clusterState, projectId);
132132
assertThat(result.getExecutorNode(), equalTo(node.getId()));
133133
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
134134
}
@@ -165,7 +165,7 @@ public void testGetStatelessAssignment() {
165165
Strings.EMPTY_ARRAY,
166166
Strings.EMPTY_ARRAY
167167
);
168-
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
168+
var result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId);
169169
assertThat(result.getExecutorNode(), nullValue());
170170

171171
// Assign a copy of the shard to a search node
@@ -185,7 +185,7 @@ public void testGetStatelessAssignment() {
185185
.build()
186186
)
187187
.build();
188-
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState);
188+
result = executor.getAssignment(params, Set.of(indexNode, searchNode), clusterState, projectId);
189189
assertThat(result.getExecutorNode(), equalTo(searchNode.getId()));
190190
}
191191

x-pack/plugin/migrate/src/main/java/org/elasticsearch/system_indices/task/SystemIndexMigrationExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@
99

1010
import org.elasticsearch.client.internal.Client;
1111
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.cluster.node.DiscoveryNode;
1314
import org.elasticsearch.cluster.service.ClusterService;
1415
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1516
import org.elasticsearch.common.settings.IndexScopedSettings;
17+
import org.elasticsearch.core.Nullable;
1618
import org.elasticsearch.indices.SystemIndices;
1719
import org.elasticsearch.persistent.AllocatedPersistentTask;
1820
import org.elasticsearch.persistent.PersistentTaskParams;
@@ -86,10 +88,11 @@ protected AllocatedPersistentTask createTask(
8688
}
8789

8890
@Override
89-
public PersistentTasksCustomMetadata.Assignment getAssignment(
91+
protected PersistentTasksCustomMetadata.Assignment doGetAssignment(
9092
SystemIndexMigrationTaskParams params,
9193
Collection<DiscoveryNode> candidateNodes,
92-
ClusterState clusterState
94+
ClusterState clusterState,
95+
@Nullable ProjectId projectId
9396
) {
9497
// This should select from master-eligible nodes because we already require all master-eligible nodes to have all plugins installed.
9598
// However, due to a misunderstanding, this code as-written needs to run on the master node in particular. This is not a fundamental

0 commit comments

Comments
 (0)