-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Make IndexLifecycleService project-aware
#129932
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ff3ec37
6acbb7c
7a95259
38297ff
6444f6d
67e3dba
1e515c8
59855ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,7 +18,6 @@ | |
| import org.elasticsearch.cluster.ProjectState; | ||
| import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
| import org.elasticsearch.cluster.metadata.LifecycleExecutionState; | ||
| import org.elasticsearch.cluster.metadata.Metadata; | ||
| import org.elasticsearch.cluster.metadata.ProjectMetadata; | ||
| import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata; | ||
| import org.elasticsearch.cluster.service.ClusterService; | ||
|
|
@@ -28,7 +27,6 @@ | |
| import org.elasticsearch.common.scheduler.TimeValueSchedule; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.common.util.concurrent.AbstractRunnable; | ||
| import org.elasticsearch.core.FixForMultiProject; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.core.SuppressForbidden; | ||
| import org.elasticsearch.core.TimeValue; | ||
|
|
@@ -183,9 +181,12 @@ public ProjectMetadata moveIndicesToPreviouslyFailedStep(ProjectMetadata current | |
| void onMaster(ClusterState clusterState) { | ||
| maybeScheduleJob(); | ||
|
|
||
| // TODO multi-project: this probably needs a per-project iteration | ||
| @FixForMultiProject | ||
| final ProjectState state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID); | ||
| for (var projectId : clusterState.metadata().projects().keySet()) { | ||
| onMaster(clusterState.projectState(projectId)); | ||
| } | ||
| } | ||
|
|
||
| void onMaster(ProjectState state) { | ||
| final ProjectMetadata projectMetadata = state.metadata(); | ||
| final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); | ||
| if (currentMetadata != null) { | ||
|
|
@@ -416,20 +417,21 @@ public void applyClusterState(ClusterChangedEvent event) { | |
| return; | ||
| } | ||
|
|
||
| @FixForMultiProject | ||
| final IndexLifecycleMetadata ilmMetadata = event.state() | ||
| .metadata() | ||
| .getProject(Metadata.DEFAULT_PROJECT_ID) | ||
| .custom(IndexLifecycleMetadata.TYPE); | ||
| if (ilmMetadata == null) { | ||
| return; | ||
| } | ||
| final IndexLifecycleMetadata previousIlmMetadata = event.previousState() | ||
| .metadata() | ||
| .getProject(Metadata.DEFAULT_PROJECT_ID) | ||
| .custom(IndexLifecycleMetadata.TYPE); | ||
| if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) { | ||
| policyRegistry.update(ilmMetadata); | ||
| // We're updating the policy registry cache here, which doesn't actually work with multiple projects because the policies from one | ||
| // project would overwrite the polices from another project. However, since we're not planning on running ILM in a multi-project | ||
| // cluster, we can ignore this. | ||
| for (var project : event.state().metadata().projects().values()) { | ||
| final IndexLifecycleMetadata ilmMetadata = project.custom(IndexLifecycleMetadata.TYPE); | ||
| if (ilmMetadata == null) { | ||
| continue; | ||
| } | ||
| final var previousProject = event.previousState().metadata().projects().get(project.id()); | ||
| final IndexLifecycleMetadata previousIlmMetadata = previousProject == null | ||
| ? null | ||
| : previousProject.custom(IndexLifecycleMetadata.TYPE); | ||
| if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) { | ||
| policyRegistry.update(ilmMetadata); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -461,10 +463,13 @@ public boolean policyExists(String policyId) { | |
| * @param clusterState the current cluster state | ||
| * @param fromClusterStateChange whether things are triggered from the cluster-state-listener or the scheduler | ||
| */ | ||
| @FixForMultiProject | ||
| void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) { | ||
| @FixForMultiProject | ||
| final var state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID); | ||
| for (var projectId : clusterState.metadata().projects().keySet()) { | ||
| triggerPolicies(clusterState.projectState(projectId), fromClusterStateChange); | ||
| } | ||
| } | ||
|
|
||
| void triggerPolicies(ProjectState state, boolean fromClusterStateChange) { | ||
| final var projectMetadata = state.metadata(); | ||
| IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE); | ||
|
|
||
|
|
@@ -585,51 +590,54 @@ PolicyStepsRegistry getPolicyRegistry() { | |
| return policyRegistry; | ||
| } | ||
|
|
||
| static Set<String> indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) { | ||
| static boolean indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) { | ||
|
||
| final Set<String> shutdownNodes = PluginShutdownService.shutdownTypeNodes( | ||
| state, | ||
| SingleNodeShutdownMetadata.Type.REMOVE, | ||
| SingleNodeShutdownMetadata.Type.SIGTERM, | ||
| SingleNodeShutdownMetadata.Type.REPLACE | ||
| ); | ||
| if (shutdownNodes.isEmpty()) { | ||
| return Set.of(); | ||
| return true; | ||
| } | ||
|
|
||
| // Returning a set of strings will cause weird behavior with multiple projects | ||
| @FixForMultiProject | ||
| Set<String> indicesPreventingShutdown = state.metadata() | ||
| .projects() | ||
| .values() | ||
| .stream() | ||
| .flatMap(project -> project.indices().entrySet().stream()) | ||
| // Filter out to only consider managed indices | ||
| .filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName())) | ||
| // Only look at indices in the shrink action | ||
| .filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action())) | ||
| // Only look at indices on a step that may potentially be dangerous if we removed the node | ||
| .filter(indexToMetadata -> { | ||
| String step = indexToMetadata.getValue().getLifecycleExecutionState().step(); | ||
| return SetSingleNodeAllocateStep.NAME.equals(step) | ||
| || CheckShrinkReadyStep.NAME.equals(step) | ||
| || ShrinkStep.NAME.equals(step) | ||
| || ShrunkShardsAllocatedStep.NAME.equals(step); | ||
| }) | ||
| // Only look at indices where the node picked for the shrink is the node marked as shutting down | ||
| .filter(indexToMetadata -> { | ||
| String nodePicked = indexToMetadata.getValue() | ||
| .getSettings() | ||
| .get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"); | ||
| return nodeId.equals(nodePicked); | ||
| }) | ||
| .map(Map.Entry::getKey) | ||
| .collect(Collectors.toSet()); | ||
| logger.trace( | ||
| "with nodes marked as shutdown for removal {}, indices {} are preventing shutdown", | ||
| shutdownNodes, | ||
| indicesPreventingShutdown | ||
| ); | ||
| return indicesPreventingShutdown; | ||
| boolean result = true; | ||
| for (var project : state.metadata().projects().values()) { | ||
| Set<String> indicesPreventingShutdown = project.indices() | ||
| .entrySet() | ||
| .stream() | ||
| // Filter out to only consider managed indices | ||
| .filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName())) | ||
| // Only look at indices in the shrink action | ||
| .filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action())) | ||
| // Only look at indices on a step that may potentially be dangerous if we removed the node | ||
| .filter(indexToMetadata -> { | ||
| String step = indexToMetadata.getValue().getLifecycleExecutionState().step(); | ||
| return SetSingleNodeAllocateStep.NAME.equals(step) | ||
| || CheckShrinkReadyStep.NAME.equals(step) | ||
| || ShrinkStep.NAME.equals(step) | ||
| || ShrunkShardsAllocatedStep.NAME.equals(step); | ||
| }) | ||
| // Only look at indices where the node picked for the shrink is the node marked as shutting down | ||
| .filter(indexToMetadata -> { | ||
| String nodePicked = indexToMetadata.getValue() | ||
| .getSettings() | ||
| .get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id"); | ||
| return nodeId.equals(nodePicked); | ||
| }) | ||
| .map(Map.Entry::getKey) | ||
| .collect(Collectors.toSet()); | ||
| logger.trace( | ||
| "with nodes marked as shutdown for removal {}, indices {} in project {} are preventing shutdown", | ||
| shutdownNodes, | ||
| indicesPreventingShutdown, | ||
| project.id() | ||
| ); | ||
| if (indicesPreventingShutdown.isEmpty() == false) { | ||
| result = false; | ||
| } | ||
| } | ||
| return result; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -641,8 +649,7 @@ public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shu | |
| case REPLACE: | ||
| case REMOVE: | ||
| case SIGTERM: | ||
| Set<String> indices = indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId); | ||
| return indices.isEmpty(); | ||
| return indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId); | ||
| default: | ||
| throw new IllegalArgumentException("unknown shutdown type: " + shutdownType); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to add an annotation and use it here before I merge this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we could also assert that there's only one project? I would feel more comfortable knowing that a test would probably fail if we ever used the ILM plugin in a multi-project cluster, as well has having the annotation to help us remember.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought of an assertion too, but the
IndexTemplateRegistryalready installs multiple ILM polices in every project, so an assertion here would already trip.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I tried running the ILM Java REST tests and those do fail because of this (although it's not directly apparent from the failure that it's because of this registry cache).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I thought that this entire module was excluded in serverless, so this code simply doesn't exist there.
If the issue is just that we're tripping integration tests which have multiple projects but don't exclude this module like serverless does... we could fix that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't thought about this deeply and it may not be possible, but I'd quite like to understand why if not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that's exactly what happens. All tests that make use of the
DEFAULTdistribution type will trip this assertion. We might not need to run all of these tests in MP mode, but probably a significant portion of them we do - more than we can easily convert for this PR at least.