Skip to content

Commit bc515c4

Browse files
authored
Make IndexLifecycleService project-aware (#129932)
Updates the background service of ILM to process all projects in the cluster.
1 parent bd5f43f commit bc515c4

File tree

3 files changed

+101
-108
lines changed

3 files changed

+101
-108
lines changed

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleService.java

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.cluster.ProjectState;
1919
import org.elasticsearch.cluster.metadata.IndexMetadata;
2020
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
21-
import org.elasticsearch.cluster.metadata.Metadata;
2221
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2322
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2423
import org.elasticsearch.cluster.service.ClusterService;
@@ -28,7 +27,7 @@
2827
import org.elasticsearch.common.scheduler.TimeValueSchedule;
2928
import org.elasticsearch.common.settings.Settings;
3029
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
31-
import org.elasticsearch.core.FixForMultiProject;
30+
import org.elasticsearch.core.NotMultiProjectCapable;
3231
import org.elasticsearch.core.Nullable;
3332
import org.elasticsearch.core.SuppressForbidden;
3433
import org.elasticsearch.core.TimeValue;
@@ -183,9 +182,12 @@ public ProjectMetadata moveIndicesToPreviouslyFailedStep(ProjectMetadata current
183182
void onMaster(ClusterState clusterState) {
184183
maybeScheduleJob();
185184

186-
// TODO multi-project: this probably needs a per-project iteration
187-
@FixForMultiProject
188-
final ProjectState state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID);
185+
for (var projectId : clusterState.metadata().projects().keySet()) {
186+
onMaster(clusterState.projectState(projectId));
187+
}
188+
}
189+
190+
void onMaster(ProjectState state) {
189191
final ProjectMetadata projectMetadata = state.metadata();
190192
final IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
191193
if (currentMetadata != null) {
@@ -409,27 +411,29 @@ public boolean isForceExecution() {
409411
});
410412
}
411413

414+
@NotMultiProjectCapable(description = "See comment inside the method")
412415
@Override
413416
public void applyClusterState(ClusterChangedEvent event) {
414417
// only act if we are master, otherwise keep idle until elected
415418
if (event.localNodeMaster() == false) {
416419
return;
417420
}
418421

419-
@FixForMultiProject
420-
final IndexLifecycleMetadata ilmMetadata = event.state()
421-
.metadata()
422-
.getProject(Metadata.DEFAULT_PROJECT_ID)
423-
.custom(IndexLifecycleMetadata.TYPE);
424-
if (ilmMetadata == null) {
425-
return;
426-
}
427-
final IndexLifecycleMetadata previousIlmMetadata = event.previousState()
428-
.metadata()
429-
.getProject(Metadata.DEFAULT_PROJECT_ID)
430-
.custom(IndexLifecycleMetadata.TYPE);
431-
if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) {
432-
policyRegistry.update(ilmMetadata);
422+
// We're updating the policy registry cache here, which doesn't actually work with multiple projects because the policies from one
423+
// project would overwrite the polices from another project. However, since we're not planning on running ILM in a multi-project
424+
// cluster, we can ignore this.
425+
for (var project : event.state().metadata().projects().values()) {
426+
final IndexLifecycleMetadata ilmMetadata = project.custom(IndexLifecycleMetadata.TYPE);
427+
if (ilmMetadata == null) {
428+
continue;
429+
}
430+
final var previousProject = event.previousState().metadata().projects().get(project.id());
431+
final IndexLifecycleMetadata previousIlmMetadata = previousProject == null
432+
? null
433+
: previousProject.custom(IndexLifecycleMetadata.TYPE);
434+
if (event.previousState().nodes().isLocalNodeElectedMaster() == false || ilmMetadata != previousIlmMetadata) {
435+
policyRegistry.update(ilmMetadata);
436+
}
433437
}
434438
}
435439

@@ -461,10 +465,13 @@ public boolean policyExists(String policyId) {
461465
* @param clusterState the current cluster state
462466
* @param fromClusterStateChange whether things are triggered from the cluster-state-listener or the scheduler
463467
*/
464-
@FixForMultiProject
465468
void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
466-
@FixForMultiProject
467-
final var state = clusterState.projectState(Metadata.DEFAULT_PROJECT_ID);
469+
for (var projectId : clusterState.metadata().projects().keySet()) {
470+
triggerPolicies(clusterState.projectState(projectId), fromClusterStateChange);
471+
}
472+
}
473+
474+
void triggerPolicies(ProjectState state, boolean fromClusterStateChange) {
468475
final var projectMetadata = state.metadata();
469476
IndexLifecycleMetadata currentMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE);
470477

@@ -585,51 +592,54 @@ PolicyStepsRegistry getPolicyRegistry() {
585592
return policyRegistry;
586593
}
587594

588-
static Set<String> indicesOnShuttingDownNodesInDangerousStep(ClusterState state, String nodeId) {
595+
static boolean hasIndicesInDangerousStepForNodeShutdown(ClusterState state, String nodeId) {
589596
final Set<String> shutdownNodes = PluginShutdownService.shutdownTypeNodes(
590597
state,
591598
SingleNodeShutdownMetadata.Type.REMOVE,
592599
SingleNodeShutdownMetadata.Type.SIGTERM,
593600
SingleNodeShutdownMetadata.Type.REPLACE
594601
);
595602
if (shutdownNodes.isEmpty()) {
596-
return Set.of();
603+
return true;
597604
}
598605

599-
// Returning a set of strings will cause weird behavior with multiple projects
600-
@FixForMultiProject
601-
Set<String> indicesPreventingShutdown = state.metadata()
602-
.projects()
603-
.values()
604-
.stream()
605-
.flatMap(project -> project.indices().entrySet().stream())
606-
// Filter out to only consider managed indices
607-
.filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName()))
608-
// Only look at indices in the shrink action
609-
.filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action()))
610-
// Only look at indices on a step that may potentially be dangerous if we removed the node
611-
.filter(indexToMetadata -> {
612-
String step = indexToMetadata.getValue().getLifecycleExecutionState().step();
613-
return SetSingleNodeAllocateStep.NAME.equals(step)
614-
|| CheckShrinkReadyStep.NAME.equals(step)
615-
|| ShrinkStep.NAME.equals(step)
616-
|| ShrunkShardsAllocatedStep.NAME.equals(step);
617-
})
618-
// Only look at indices where the node picked for the shrink is the node marked as shutting down
619-
.filter(indexToMetadata -> {
620-
String nodePicked = indexToMetadata.getValue()
621-
.getSettings()
622-
.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id");
623-
return nodeId.equals(nodePicked);
624-
})
625-
.map(Map.Entry::getKey)
626-
.collect(Collectors.toSet());
627-
logger.trace(
628-
"with nodes marked as shutdown for removal {}, indices {} are preventing shutdown",
629-
shutdownNodes,
630-
indicesPreventingShutdown
631-
);
632-
return indicesPreventingShutdown;
606+
boolean result = true;
607+
for (var project : state.metadata().projects().values()) {
608+
Set<String> indicesPreventingShutdown = project.indices()
609+
.entrySet()
610+
.stream()
611+
// Filter out to only consider managed indices
612+
.filter(indexToMetadata -> Strings.hasText(indexToMetadata.getValue().getLifecyclePolicyName()))
613+
// Only look at indices in the shrink action
614+
.filter(indexToMetadata -> ShrinkAction.NAME.equals(indexToMetadata.getValue().getLifecycleExecutionState().action()))
615+
// Only look at indices on a step that may potentially be dangerous if we removed the node
616+
.filter(indexToMetadata -> {
617+
String step = indexToMetadata.getValue().getLifecycleExecutionState().step();
618+
return SetSingleNodeAllocateStep.NAME.equals(step)
619+
|| CheckShrinkReadyStep.NAME.equals(step)
620+
|| ShrinkStep.NAME.equals(step)
621+
|| ShrunkShardsAllocatedStep.NAME.equals(step);
622+
})
623+
// Only look at indices where the node picked for the shrink is the node marked as shutting down
624+
.filter(indexToMetadata -> {
625+
String nodePicked = indexToMetadata.getValue()
626+
.getSettings()
627+
.get(IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id");
628+
return nodeId.equals(nodePicked);
629+
})
630+
.map(Map.Entry::getKey)
631+
.collect(Collectors.toSet());
632+
logger.trace(
633+
"with nodes marked as shutdown for removal {}, indices {} in project {} are preventing shutdown",
634+
shutdownNodes,
635+
indicesPreventingShutdown,
636+
project.id()
637+
);
638+
if (indicesPreventingShutdown.isEmpty() == false) {
639+
result = false;
640+
}
641+
}
642+
return result;
633643
}
634644

635645
@Override
@@ -641,8 +651,7 @@ public boolean safeToShutdown(String nodeId, SingleNodeShutdownMetadata.Type shu
641651
case REPLACE:
642652
case REMOVE:
643653
case SIGTERM:
644-
Set<String> indices = indicesOnShuttingDownNodesInDangerousStep(clusterService.state(), nodeId);
645-
return indices.isEmpty();
654+
return hasIndicesInDangerousStepForNodeShutdown(clusterService.state(), nodeId);
646655
default:
647656
throw new IllegalArgumentException("unknown shutdown type: " + shutdownType);
648657
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleServiceTests.java

Lines changed: 32 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
1919
import org.elasticsearch.cluster.metadata.Metadata;
2020
import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
21+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2122
import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
2223
import org.elasticsearch.cluster.node.DiscoveryNode;
2324
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
@@ -169,13 +170,11 @@ public void testStoppedModeSkip() {
169170
.numberOfReplicas(randomIntBetween(0, 5))
170171
.build();
171172
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
172-
Metadata metadata = Metadata.builder()
173+
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
173174
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPED))
174-
.indices(indices)
175-
.persistentSettings(settings(IndexVersion.current()).build())
176-
.build();
175+
.indices(indices);
177176
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
178-
.metadata(metadata)
177+
.putProjectMetadata(project)
179178
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
180179
.build();
181180
ClusterChangedEvent event = new ClusterChangedEvent("_source", currentState, ClusterState.EMPTY_STATE);
@@ -208,13 +207,11 @@ public void testRequestedStopOnShrink() {
208207
.numberOfReplicas(randomIntBetween(0, 5))
209208
.build();
210209
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
211-
Metadata metadata = Metadata.builder()
210+
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
212211
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
213-
.indices(indices)
214-
.persistentSettings(settings(IndexVersion.current()).build())
215-
.build();
212+
.indices(indices);
216213
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
217-
.metadata(metadata)
214+
.putProjectMetadata(project)
218215
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
219216
.build();
220217

@@ -264,13 +261,11 @@ private void verifyCanStopWithStep(String stoppableStep) {
264261
.numberOfReplicas(randomIntBetween(0, 5))
265262
.build();
266263
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
267-
Metadata metadata = Metadata.builder()
264+
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
268265
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
269-
.indices(indices)
270-
.persistentSettings(settings(IndexVersion.current()).build())
271-
.build();
266+
.indices(indices);
272267
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
273-
.metadata(metadata)
268+
.putProjectMetadata(project)
274269
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
275270
.build();
276271

@@ -312,13 +307,11 @@ public void testRequestedStopOnSafeAction() {
312307
.numberOfReplicas(randomIntBetween(0, 5))
313308
.build();
314309
Map<String, IndexMetadata> indices = Map.of(index.getName(), indexMetadata);
315-
Metadata metadata = Metadata.builder()
310+
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
316311
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.STOPPING))
317-
.indices(indices)
318-
.persistentSettings(settings(IndexVersion.current()).build())
319-
.build();
312+
.indices(indices);
320313
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
321-
.metadata(metadata)
314+
.putProjectMetadata(project)
322315
.nodes(DiscoveryNodes.builder().localNodeId(nodeId).masterNodeId(nodeId).add(masterNode).build())
323316
.build();
324317

@@ -429,11 +422,9 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
429422
.build();
430423
Map<String, IndexMetadata> indices = Map.of(index1.getName(), i1indexMetadata, index2.getName(), i2indexMetadata);
431424

432-
Metadata metadata = Metadata.builder()
425+
var project = ProjectMetadata.builder(randomProjectIdOrDefault())
433426
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(policyMap, OperationMode.RUNNING))
434-
.indices(indices)
435-
.persistentSettings(settings(IndexVersion.current()).build())
436-
.build();
427+
.indices(indices);
437428

438429
Settings settings = Settings.builder().put(LifecycleSettings.LIFECYCLE_POLL_INTERVAL, "1s").build();
439430
var clusterSettings = new ClusterSettings(
@@ -443,7 +434,7 @@ public void doTestExceptionStillProcessesOtherIndices(boolean useOnMaster) {
443434
ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool, clusterSettings);
444435
DiscoveryNode node = clusterService.localNode();
445436
ClusterState currentState = ClusterState.builder(ClusterName.DEFAULT)
446-
.metadata(metadata)
437+
.putProjectMetadata(project)
447438
.nodes(DiscoveryNodes.builder().add(node).masterNodeId(node.getId()).localNodeId(node.getId()))
448439
.build();
449440
ClusterServiceUtils.setState(clusterService, currentState);
@@ -533,15 +524,16 @@ public void testParsingOriginationDateBeforeIndexCreation() {
533524
}
534525
}
535526

536-
public void testIndicesOnShuttingDownNodesInDangerousStep() {
527+
public void testHasIndicesInDangerousStepForNodeShutdown() {
537528
for (SingleNodeShutdownMetadata.Type type : List.of(
538529
SingleNodeShutdownMetadata.Type.REMOVE,
539530
SingleNodeShutdownMetadata.Type.SIGTERM,
540531
SingleNodeShutdownMetadata.Type.REPLACE
541532
)) {
542-
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
543-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
544-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of()));
533+
final var project = ProjectMetadata.builder(randomProjectIdOrDefault()).build();
534+
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build();
535+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
536+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(true));
545537

546538
IndexMetadata nonDangerousIndex = IndexMetadata.builder("no_danger")
547539
.settings(settings(IndexVersion.current()).put(LifecycleSettings.LIFECYCLE_NAME, "mypolicy"))
@@ -583,14 +575,12 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
583575
.build();
584576
Map<String, IndexMetadata> indices = Map.of("no_danger", nonDangerousIndex, "danger", dangerousIndex);
585577

586-
Metadata metadata = Metadata.builder()
587-
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING))
588-
.indices(indices)
589-
.persistentSettings(settings(IndexVersion.current()).build())
590-
.build();
591-
592578
state = ClusterState.builder(ClusterName.DEFAULT)
593-
.metadata(metadata)
579+
.putProjectMetadata(
580+
ProjectMetadata.builder(project)
581+
.putCustom(IndexLifecycleMetadata.TYPE, new IndexLifecycleMetadata(Map.of(), OperationMode.RUNNING))
582+
.indices(indices)
583+
)
594584
.nodes(
595585
DiscoveryNodes.builder()
596586
.localNodeId(nodeId)
@@ -613,8 +603,8 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
613603
.build();
614604

615605
// No danger yet, because no node is shutting down
616-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
617-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of()));
606+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
607+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(true));
618608

619609
state = ClusterState.builder(state)
620610
.metadata(
@@ -638,12 +628,12 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
638628
)
639629
.build();
640630

641-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "regular_node"), equalTo(Set.of()));
631+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "regular_node"), equalTo(true));
642632
// No danger, because this is a "RESTART" type shutdown
643633
assertThat(
644634
"restart type shutdowns are not considered dangerous",
645-
IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"),
646-
equalTo(Set.of())
635+
IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"),
636+
equalTo(true)
647637
);
648638

649639
final String targetNodeName = type == SingleNodeShutdownMetadata.Type.REPLACE ? randomAlphaOfLengthBetween(10, 20) : null;
@@ -673,7 +663,7 @@ public void testIndicesOnShuttingDownNodesInDangerousStep() {
673663
.build();
674664

675665
// The dangerous index should be calculated as being in danger now
676-
assertThat(IndexLifecycleService.indicesOnShuttingDownNodesInDangerousStep(state, "shutdown_node"), equalTo(Set.of("danger")));
666+
assertThat(IndexLifecycleService.hasIndicesInDangerousStepForNodeShutdown(state, "shutdown_node"), equalTo(false));
677667
}
678668
}
679669
}

0 commit comments

Comments
 (0)