Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.persistent.AllocatedPersistentTask;
Expand Down Expand Up @@ -136,7 +138,8 @@ protected HealthNode createTask(
public PersistentTasksCustomMetadata.Assignment getAssignment(
HealthNodeTaskParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState
ClusterState clusterState,
@Nullable ProjectId projectId
) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
if (discoveryNode == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ public ClusterState execute(ClusterState currentState) {
assert (projectId == null && taskExecutor.scope() == PersistentTasksExecutor.Scope.CLUSTER)
|| (projectId != null && taskExecutor.scope() == PersistentTasksExecutor.Scope.PROJECT)
: "inconsistent project-id [" + projectId + "] and task scope [" + taskExecutor.scope() + "]";
taskExecutor.validate(taskParams, currentState);
taskExecutor.validate(taskParams, currentState, projectId);

Assignment assignment = createAssignment(taskName, taskParams, currentState);
Assignment assignment = createAssignment(taskName, taskParams, currentState, projectId);
logger.debug("creating {} persistent task [{}] with assignment [{}]", taskTypeString(projectId), taskName, assignment);
return builder.addTask(taskId, taskName, taskParams, assignment).buildAndUpdate(currentState, projectId);
}
Expand Down Expand Up @@ -449,7 +449,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
private <Params extends PersistentTaskParams> Assignment createAssignment(
final String taskName,
final Params taskParams,
final ClusterState currentState
final ClusterState currentState,
@Nullable final ProjectId projectId
) {
PersistentTasksExecutor<Params> persistentTasksExecutor = registry.getPersistentTaskExecutorSafe(taskName);

Expand All @@ -468,7 +469,7 @@ private <Params extends PersistentTaskParams> Assignment createAssignment(
// Task assignment should not rely on node order
Randomness.shuffle(candidateNodes);

final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState);
final Assignment assignment = persistentTasksExecutor.getAssignment(taskParams, candidateNodes, currentState, projectId);
assert assignment != null : "getAssignment() should always return an Assignment object, containing a node or a reason why not";
assert (assignment.getExecutorNode() == null
|| currentState.metadata().nodeShutdowns().contains(assignment.getExecutorNode()) == false)
Expand Down Expand Up @@ -540,8 +541,8 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
* persistent tasks changed.
*/
boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
final List<PersistentTasks> allTasks = PersistentTasks.getAllTasks(event.state()).map(Tuple::v2).toList();
if (allTasks.isEmpty()) {
var projectIdToTasksIterator = PersistentTasks.getAllTasks(event.state()).iterator();
if (projectIdToTasksIterator.hasNext() == false) {
return false;
}

Expand All @@ -553,10 +554,16 @@ boolean shouldReassignPersistentTasks(final ClusterChangedEvent event) {
|| event.metadataChanged()
|| masterChanged) {

for (PersistentTasks tasks : allTasks) {
for (PersistentTask<?> task : tasks.tasks()) {
while (projectIdToTasksIterator.hasNext()) {
var projectIdToTasks = projectIdToTasksIterator.next();
for (PersistentTask<?> task : projectIdToTasks.v2().tasks()) {
if (needsReassignment(task.getAssignment(), event.state().nodes())) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), event.state());
Assignment assignment = createAssignment(
task.getTaskName(),
task.getParams(),
event.state(),
projectIdToTasks.v1()
);
if (Objects.equals(assignment, task.getAssignment()) == false) {
return true;
}
Expand Down Expand Up @@ -602,7 +609,7 @@ private ClusterState reassignClusterOrSingleProjectTasks(@Nullable final Project
// We need to check if removed nodes were running any of the tasks and reassign them
for (PersistentTask<?> task : tasks.tasks()) {
if (needsReassignment(task.getAssignment(), nodes)) {
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState);
Assignment assignment = createAssignment(task.getTaskName(), task.getParams(), clusterState, projectId);
if (Objects.equals(assignment, task.getAssignment()) == false) {
logger.trace(
"reassigning {} task {} from node {} to node {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package org.elasticsearch.persistent;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Tuple;
Expand Down Expand Up @@ -63,7 +64,12 @@ public Scope scope() {
* <p>
* The default implementation returns the least loaded data node from amongst the collection of candidate nodes
*/
public Assignment getAssignment(Params params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
public Assignment getAssignment(
Params params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
@Nullable ProjectId projectId
) {
DiscoveryNode discoveryNode = selectLeastLoadedNode(clusterState, candidateNodes, DiscoveryNode::canContainData);
if (discoveryNode == null) {
return NO_NODE_FOUND;
Expand Down Expand Up @@ -105,7 +111,7 @@ protected DiscoveryNode selectLeastLoadedNode(
* <p>
* Throws an exception if the supplied params cannot be executed on the cluster in the current state.
*/
public void validate(Params params, ClusterState clusterState) {}
public void validate(Params params, ClusterState clusterState, @Nullable ProjectId projectId) {}

/**
* Creates a AllocatedPersistentTask for communicating with task manager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -1087,7 +1088,12 @@ public Scope scope() {
}

@Override
public Assignment getAssignment(P params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
public Assignment getAssignment(
P params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
ProjectId projectId
) {
return fn.apply(params, candidateNodes, clusterState);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
Expand Down Expand Up @@ -326,12 +327,17 @@ public static void setNonClusterStateCondition(boolean nonClusterStateCondition)
}

@Override
public Assignment getAssignment(TestParams params, Collection<DiscoveryNode> candidateNodes, ClusterState clusterState) {
public Assignment getAssignment(
TestParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState,
ProjectId projectId
) {
if (nonClusterStateCondition == false) {
return new Assignment(null, "non cluster state condition prevents assignment");
}
if (params == null || params.getExecutorNodeAttr() == null) {
return super.getAssignment(params, candidateNodes, clusterState);
return super.getAssignment(params, candidateNodes, clusterState, projectId);
} else {
DiscoveryNode executorNode = selectLeastLoadedNode(
clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
Expand Down Expand Up @@ -118,7 +120,7 @@ public ShardFollowTasksExecutor(Client client, ThreadPool threadPool, ClusterSer
}

@Override
public void validate(ShardFollowTask params, ClusterState clusterState) {
public void validate(ShardFollowTask params, ClusterState clusterState, @Nullable ProjectId projectId) {
final IndexRoutingTable routingTable = clusterState.getRoutingTable().index(params.getFollowShardId().getIndex());
final ShardRouting primaryShard = routingTable.shard(params.getFollowShardId().id()).primaryShard();
if (primaryShard.active() == false) {
Expand All @@ -131,8 +133,9 @@ public void validate(ShardFollowTask params, ClusterState clusterState) {
@Override
public Assignment getAssignment(
final ShardFollowTask params,
Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState
final Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState,
@Nullable final ProjectId projectId
) {
final DiscoveryNode node = selectLeastLoadedNode(
clusterState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ private void runAssignmentTest(
final Assignment assignment = executor.getAssignment(
mock(ShardFollowTask.class),
clusterStateBuilder.nodes().getAllNodes(),
clusterStateBuilder.build()
clusterStateBuilder.build(),
null
);
consumer.accept(theSpecial, assignment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -209,6 +210,9 @@ public TransformMetadata build() {
}
}

/**
* @deprecated use {@link #transformMetadata(ClusterState, ProjectId)}
*/
@Deprecated(forRemoval = true)
public static TransformMetadata getTransformMetadata(ClusterState state) {
TransformMetadata TransformMetadata = (state == null) ? null : state.metadata().getSingleProjectCustom(TYPE);
Expand All @@ -218,6 +222,17 @@ public static TransformMetadata getTransformMetadata(ClusterState state) {
return TransformMetadata;
}

public static TransformMetadata transformMetadata(@Nullable ClusterState state, @Nullable ProjectId projectId) {
if (state == null || projectId == null) {
return EMPTY_METADATA;
}
TransformMetadata transformMetadata = state.metadata().getProject(projectId).custom(TYPE);
if (transformMetadata == null) {
return EMPTY_METADATA;
}
return transformMetadata;
}

public static boolean upgradeMode(ClusterState state) {
return getTransformMetadata(state).upgradeMode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.TimeSeriesIdFieldMapper;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -113,7 +115,7 @@ protected AllocatedPersistentTask createTask(
}

@Override
public void validate(DownsampleShardTaskParams params, ClusterState clusterState) {
public void validate(DownsampleShardTaskParams params, ClusterState clusterState, @Nullable ProjectId projectId) {
// This is just a pre-check, but doesn't prevent from avoiding from aborting the task when source index disappeared
// after initial creation of the persistent task.
var indexShardRouting = findShardRoutingTable(params.shardId(), clusterState);
Expand All @@ -126,7 +128,8 @@ public void validate(DownsampleShardTaskParams params, ClusterState clusterState
public PersistentTasksCustomMetadata.Assignment getAssignment(
final DownsampleShardTaskParams params,
final Collection<DiscoveryNode> candidateNodes,
final ClusterState clusterState
final ClusterState clusterState,
@Nullable final ProjectId projectId
) {
// NOTE: downsampling works by running a task per each shard of the source index.
// Here we make sure we assign the task to the actual node holding the shard identified by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testGetAssignment() {
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
var result = executor.getAssignment(params, Set.of(node), clusterState, null);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
}

Expand Down Expand Up @@ -119,7 +119,7 @@ public void testGetAssignmentMissingIndex() {
Strings.EMPTY_ARRAY,
Strings.EMPTY_ARRAY
);
var result = executor.getAssignment(params, Set.of(node), clusterState);
var result = executor.getAssignment(params, Set.of(node), clusterState, null);
assertThat(result.getExecutorNode(), equalTo(node.getId()));
assertThat(result.getExplanation(), equalTo("a node to fail and stop this persistent task"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@

import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.persistent.AllocatedPersistentTask;
import org.elasticsearch.persistent.PersistentTaskParams;
Expand Down Expand Up @@ -89,7 +91,8 @@ protected AllocatedPersistentTask createTask(
public PersistentTasksCustomMetadata.Assignment getAssignment(
SystemIndexMigrationTaskParams params,
Collection<DiscoveryNode> candidateNodes,
ClusterState clusterState
ClusterState clusterState,
@Nullable ProjectId projectId
) {
// This should select from master-eligible nodes because we already require all master-eligible nodes to have all plugins installed.
// However, due to a misunderstanding, this code as-written needs to run on the master node in particular. This is not a fundamental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -30,6 +31,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.injection.guice.Inject;
Expand Down Expand Up @@ -693,7 +695,8 @@ protected AllocatedPersistentTask createTask(
public PersistentTasksCustomMetadata.Assignment getAssignment(
TaskParams params,
Collection<DiscoveryNode> candidateNodes,
@SuppressWarnings("HiddenField") ClusterState clusterState
@SuppressWarnings("HiddenField") ClusterState clusterState,
@Nullable ProjectId projectId
) {
boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed();
Optional<PersistentTasksCustomMetadata.Assignment> optionalAssignment = getPotentialAssignment(
Expand Down
Loading