Skip to content

Commit 8b236cb

Browse files
committed
[Transform] Assign based on ProjectId
Pass the ProjectId from PersistentTaskClusterService through to all PersistentTasksExecutors when creating node assignments. These PersistentTasksExecutors require the ProjectId during node assignment: - OpenJobPersistentTasksExecutor - SnapshotUpgradeTaskExecutor - StartDatafeedPersistentTasksExecutor - TransformPersistentTasksExecutor Implemented TransformPersistentTasksExecutor's getAssignment using the ProjectId by reading the TransformMetadata and RoutingTable from the ProjectMetadata.
1 parent 1e6473a commit 8b236cb

File tree

21 files changed

+237
-71
lines changed

21 files changed

+237
-71
lines changed

server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import org.elasticsearch.cluster.ClusterChangedEvent;
1717
import org.elasticsearch.cluster.ClusterState;
1818
import org.elasticsearch.cluster.ClusterStateListener;
19+
import org.elasticsearch.cluster.metadata.ProjectId;
1920
import org.elasticsearch.cluster.node.DiscoveryNode;
2021
import org.elasticsearch.cluster.service.ClusterService;
2122
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2223
import org.elasticsearch.common.settings.ClusterSettings;
2324
import org.elasticsearch.common.settings.Setting;
2425
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.core.Nullable;
2527
import org.elasticsearch.core.TimeValue;
2628
import org.elasticsearch.node.NodeClosedException;
2729
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -136,7 +138,8 @@ protected HealthNode createTask(
136138
public PersistentTasksCustomMetadata.Assignment getAssignment(
137139
HealthNodeTaskParams params,
138140
Collection<DiscoveryNode> candidateNodes,
139-
ClusterState clusterState
141+
ClusterState clusterState,
142+
@Nullable ProjectId projectId
140143
) {
141144
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
142145
if (discoveryNode == null) {

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) {
171171
assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER)
172172
|| (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT)
173173
: "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]";
174-
taskExecutor.validate(taskParams, currentState);
174+
taskExecutor.validate(taskParams, currentState, projectId);
175175

176-
Assignment assignment = createAssignment(taskName, taskParams, currentState);
176+
Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId);
177177
logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment);
178178
return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId);
179179
}
@@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
449449
private <Params extends PersistentTaskParams> Assignment createAssignment(
450450
final String taskName,
451451
final Params taskParams,
452-
final ClusterState currentState
452+
final ClusterState currentState,
453+
@Nullable final ProjectId projectId
453454
) {
454455
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);
455456

@@ -468,7 +469,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
468469
// Task assignment should not rely on node order
469470
Randomness.shuffle(candidateNodes);
470471

471-
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
472+
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId);
472473
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
473474
assert (assignment.getExecutorNode() == null
474475
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
@@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
540541
* persistent tasks changed.
541542
*/
542543
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
543-
final List<PersistentTasks> allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList();
544-
if (allTasks.isEmpty()) {
544+
var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator();
545+
if (projectIdToTasksIterator.hasNext() == false) {
545546
return false;
546547
}
547548

@@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
553554
|| event.metadataChanged()
554555
|| masterChanged) {
555556

556-
for (PersistentTasks tasks : allTasks) {
557-
for (PersistentTask<?> task : tasks.tasks()) {
557+
while (projectIdToTasksIterator.hasNext()) {
558+
var projectIdToTasks = projectIdToTasksIterator.next();
559+
for (PersistentTask<?> task : projectIdToTasks.v2().tasks()) {
558560
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
559-
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
561+
Assignment assignment = createAssignment(
562+
task.getTaskName(),
563+
task.getParams(),
564+
event.state(),
565+
projectIdToTasks.v1()
566+
);
560567
if (Objects.equals(assignment, task.getAssignment()) == false) {
561568
return true;
562569
}
@@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project
602609
// We need to check if removed nodes were running any of the tasks and reassign them
603610
for (PersistentTask<?> task : tasks.tasks()) {
604611
if (needsReassignment(task.getAssignment(), nodes)) {
605-
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
612+
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId);
606613
if (Objects.equals(assignment, task.getAssignment()) == false) {
607614
logger.trace(
608615
"reassigning {} task {} from node {} to node {}",

server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
package org.elasticsearch.persistent;
1111

1212
import org.elasticsearch.cluster.ClusterState;
13+
import org.elasticsearch.cluster.metadata.ProjectId;
1314
import org.elasticsearch.cluster.node.DiscoveryNode;
1415
import org.elasticsearch.core.Nullable;
1516
import org.elasticsearch.core.Tuple;
@@ -58,12 +59,29 @@ public Scope scope() {
5859

5960
public static final Assignment NO_NODE_FOUND = new Assignment(null, "no appropriate nodes found for the assignment");
6061

62+
/**
63+
* Use {@link #getAssignment(PersistentTaskParams, Collection, ClusterState, ProjectId)}.
64+
*/
65+
@Deprecated
66+
public PersistentTasksCustomMetadata.Assignment getAssignment(
67+
Params params,
68+
Collection<DiscoveryNode> candidateNodes,
69+
ClusterState clusterState
70+
) {
71+
return getAssignment(params, candidateNodes, clusterState, null);
72+
}
73+
6174
/**
6275
* Returns the node id where the params has to be executed,
6376
* <p>
6477
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
6578
*/
66-
public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
79+
public Assignment getAssignment(
80+
Params params,
81+
Collection<DiscoveryNode> candidateNodes,
82+
ClusterState clusterState,
83+
@Nullable ProjectId projectId
84+
) {
6785
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
6886
if (discoveryNode == null) {
6987
return NO_NODE_FOUND;
@@ -105,7 +123,7 @@ protected DiscoveryNode selectLeastLoadedNode(
105123
* <p>
106124
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
107125
*/
108-
public void validate(Params params, ClusterState clusterState) {}
126+
public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {}
109127

110128
/**
111129
* Creates a AllocatedPersistentTask for communicating with task manager

server/src/test/java/org/elasticsearch/persistent/PersistentTasksClusterServiceTests.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.metadata.IndexMetadata;
2121
import org.elasticsearch.cluster.metadata.Metadata;
2222
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
23+
import org.elasticsearch.cluster.metadata.ProjectId;
2324
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2425
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -1087,7 +1088,12 @@ public Scope scope() {
10871088
}
10881089

10891090
@Override
1090-
public Assignment getAssignment(P params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
1091+
public Assignment getAssignment(
1092+
P params,
1093+
Collection<DiscoveryNode> candidateNodes,
1094+
ClusterState clusterState,
1095+
ProjectId projectId
1096+
) {
10911097
return fn.apply(params, candidateNodes, clusterState);
10921098
}
10931099

server/src/test/java/org/elasticsearch/persistent/TestPersistentTasksPlugin.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.client.internal.ElasticsearchClient;
2626
import org.elasticsearch.cluster.ClusterState;
2727
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
28+
import org.elasticsearch.cluster.metadata.ProjectId;
2829
import org.elasticsearch.cluster.node.DiscoveryNode;
2930
import org.elasticsearch.cluster.service.ClusterService;
3031
import org.elasticsearch.common.Strings;
@@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition)
326327
}
327328

328329
@Override
329-
public Assignment getAssignment(TestParams params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
330+
public Assignment getAssignment(
331+
TestParams params,
332+
Collection<DiscoveryNode> candidateNodes,
333+
ClusterState clusterState,
334+
ProjectId projectId
335+
) {
330336
if (nonClusterStateCondition == false) {
331337
return new Assignment(null, "non cluster state condition prevents assignment");
332338
}
333339
if (params == null || params.getExecutorNodeAttr() == null) {
334-
return super.getAssignment(params, candidateNodes, clusterState);
340+
return super.getAssignment(params, candidateNodes, clusterState, projectId);
335341
} else {
336342
DiscoveryNode executorNode = selectLeastLoadedNode(
337343
clusterState,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.metadata.AliasMetadata;
3333
import org.elasticsearch.cluster.metadata.IndexMetadata;
3434
import org.elasticsearch.cluster.metadata.MappingMetadata;
35+
import org.elasticsearch.cluster.metadata.ProjectId;
3536
import org.elasticsearch.cluster.node.DiscoveryNode;
3637
import org.elasticsearch.cluster.routing.IndexRoutingTable;
3738
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.common.util.concurrent.EsExecutors;
4445
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4546
import org.elasticsearch.core.CheckedConsumer;
47+
import org.elasticsearch.core.Nullable;
4648
import org.elasticsearch.core.TimeValue;
4749
import org.elasticsearch.index.Index;
4850
import org.elasticsearch.index.IndexNotFoundException;
@@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer
118120
}
119121

120122
@Override
121-
public void validate(ShardFollowTask params, ClusterState clusterState) {
123+
public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) {
122124
final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
123125
final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard();
124126
if (primaryShard.active() == false) {
@@ -131,8 +133,9 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
131133
@Override
132134
public Assignment getAssignment(
133135
final ShardFollowTask params,
134-
Collection<DiscoveryNode> candidateNodes,
135-
final ClusterState clusterState
136+
final Collection<DiscoveryNode> candidateNodes,
137+
final ClusterState clusterState,
138+
@Nullable final ProjectId projectId
136139
) {
137140
final DiscoveryNode node = selectLeastLoadedNode(
138141
clusterState,

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutorAssignmentTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ private void runAssignmentTest(
9393
final Assignment assignment = executor.getAssignment(
9494
mock(ShardFollowTask.class),
9595
clusterStateBuilder.nodes().getAllNodes(),
96-
clusterStateBuilder.build()
96+
clusterStateBuilder.build(),
97+
null
9798
);
9899
consumer.accept(theSpecial, assignment);
99100
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/TransformMetadata.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.cluster.Diff;
1414
import org.elasticsearch.cluster.NamedDiff;
1515
import org.elasticsearch.cluster.metadata.Metadata;
16+
import org.elasticsearch.cluster.metadata.ProjectId;
1617
import org.elasticsearch.common.Strings;
1718
import org.elasticsearch.common.collect.Iterators;
1819
import org.elasticsearch.common.io.stream.StreamInput;
@@ -209,6 +210,9 @@ public TransformMetadata build() {
209210
}
210211
}
211212

213+
/**
214+
* @deprecated use {@link #transformMetadata(ClusterState, ProjectId)}
215+
*/
212216
@Deprecated(forRemoval = true)
213217
public static TransformMetadata getTransformMetadata(ClusterState state) {
214218
TransformMetadata TransformMetadata = (state == null) ? null : state.metadata().getSingleProjectCustom(TYPE);
@@ -218,6 +222,17 @@ public static TransformMetadata getTransformMetadata(ClusterState state) {
218222
return TransformMetadata;
219223
}
220224

225+
public static TransformMetadata transformMetadata(@Nullable ClusterState state, @Nullable ProjectId projectId) {
226+
if (state == null || projectId == null) {
227+
return EMPTY_METADATA;
228+
}
229+
TransformMetadata transformMetadata = state.metadata().getProject(projectId).custom(TYPE);
230+
if (transformMetadata == null) {
231+
return EMPTY_METADATA;
232+
}
233+
return transformMetadata;
234+
}
235+
221236
public static boolean upgradeMode(ClusterState state) {
222237
return getTransformMetadata(state).upgradeMode();
223238
}

x-pack/plugin/downsample/src/main/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import org.elasticsearch.action.support.TransportAction;
2323
import org.elasticsearch.client.internal.Client;
2424
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.metadata.ProjectId;
2526
import org.elasticsearch.cluster.node.DiscoveryNode;
2627
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
2728
import org.elasticsearch.cluster.routing.ShardRouting;
2829
import org.elasticsearch.common.io.stream.StreamOutput;
2930
import org.elasticsearch.common.settings.Settings;
3031
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3132
import org.elasticsearch.common.util.concurrent.EsExecutors;
33+
import org.elasticsearch.core.Nullable;
3234
import org.elasticsearch.index.IndexNotFoundException;
3335
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
3436
import org.elasticsearch.index.shard.ShardId;
@@ -116,7 +118,7 @@ protected AllocatedPersistentTask createTask(
116118
}
117119

118120
@Override
119-
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
121+
public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) {
120122
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
121123
// after initial creation of the persistent task.
122124
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
@@ -129,7 +131,8 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState
129131
public PersistentTasksCustomMetadata.Assignment getAssignment(
130132
final DownsampleShardTaskParams params,
131133
final Collection<DiscoveryNode> candidateNodes,
132-
final ClusterState clusterState
134+
final ClusterState clusterState,
135+
@Nullable final ProjectId projectId
133136
) {
134137
// NOTE: downsampling works by running a task per each shard of the source index.
135138
// Here we make sure we assign the task to the actual node holding the shard identified by

x-pack/plugin/downsample/src/test/java/org/elasticsearch/xpack/downsample/DownsampleShardPersistentTaskExecutorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void testGetAssignment() {
9696
Strings.EMPTY_ARRAY,
9797
Strings.EMPTY_ARRAY
9898
);
99-
var result = executor.getAssignment(params, Set.of(node), clusterState);
99+
var result = executor.getAssignment(params, Set.of(node), clusterState, null);
100100
assertThat(result.getExecutorNode(), equalTo(node.getId()));
101101
}
102102

@@ -128,7 +128,7 @@ public void testGetAssignmentMissingIndex() {
128128
Strings.EMPTY_ARRAY,
129129
Strings.EMPTY_ARRAY
130130
);
131-
var result = executor.getAssignment(params, Set.of(node), clusterState);
131+
var result = executor.getAssignment(params, Set.of(node), clusterState, null);
132132
assertThat(result.getExecutorNode(), equalTo(node.getId()));
133133
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
134134
}

0 commit comments

Comments
 (0)