Skip to content

Commit bbfa270

Browse files
authored
Split put lifecycle operation logic into a metadata service (#132658)
Splits the logic for updating the cluster state when adding an ILM policy into its own metadata service object which can now be called directly instead of requiring calls to the TransportPutLifecycleAction, allowing it to be reused in other places.
1 parent 422db0d commit bbfa270

File tree

5 files changed

+349
-297
lines changed

5 files changed

+349
-297
lines changed

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,16 @@ protected XPackLicenseState getLicenseState() {
141141
@Override
142142
public Collection<?> createComponents(PluginServices services) {
143143
final List<Object> components = new ArrayList<>();
144+
PutLifecycleMetadataService putLifecycleMetadataService = new PutLifecycleMetadataService(
145+
services.clusterService(),
146+
services.xContentRegistry(),
147+
services.client(),
148+
getLicenseState(),
149+
services.threadPool(),
150+
services.projectResolver()
151+
);
152+
components.add(putLifecycleMetadataService);
153+
144154
ILMHistoryTemplateRegistry ilmTemplateRegistry = new ILMHistoryTemplateRegistry(
145155
settings,
146156
services.clusterService(),
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ilm;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
14+
import org.elasticsearch.client.internal.Client;
15+
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
16+
import org.elasticsearch.cluster.ClusterState;
17+
import org.elasticsearch.cluster.ClusterStateAckListener;
18+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
19+
import org.elasticsearch.cluster.ProjectState;
20+
import org.elasticsearch.cluster.SimpleBatchedAckListenerTaskExecutor;
21+
import org.elasticsearch.cluster.metadata.ProjectId;
22+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
23+
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
24+
import org.elasticsearch.cluster.project.ProjectResolver;
25+
import org.elasticsearch.cluster.service.ClusterService;
26+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
27+
import org.elasticsearch.common.Priority;
28+
import org.elasticsearch.core.Nullable;
29+
import org.elasticsearch.core.SuppressForbidden;
30+
import org.elasticsearch.core.Tuple;
31+
import org.elasticsearch.license.XPackLicenseState;
32+
import org.elasticsearch.reservedstate.ReservedClusterStateHandler;
33+
import org.elasticsearch.threadpool.ThreadPool;
34+
import org.elasticsearch.xcontent.NamedXContentRegistry;
35+
import org.elasticsearch.xpack.core.ClientHelper;
36+
import org.elasticsearch.xpack.core.ilm.IndexLifecycleMetadata;
37+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
38+
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
39+
import org.elasticsearch.xpack.core.ilm.Phase;
40+
import org.elasticsearch.xpack.core.ilm.SearchableSnapshotAction;
41+
import org.elasticsearch.xpack.core.ilm.WaitForSnapshotAction;
42+
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
43+
import org.elasticsearch.xpack.core.slm.SnapshotLifecycleMetadata;
44+
import org.elasticsearch.xpack.ilm.action.ReservedLifecycleAction;
45+
46+
import java.time.Instant;
47+
import java.util.Collections;
48+
import java.util.List;
49+
import java.util.Map;
50+
import java.util.SortedMap;
51+
import java.util.TreeMap;
52+
53+
import static org.elasticsearch.xpack.core.ilm.LifecycleOperationMetadata.currentILMMode;
54+
import static org.elasticsearch.xpack.core.ilm.PhaseCacheManagement.updateIndicesForPolicy;
55+
import static org.elasticsearch.xpack.core.searchablesnapshots.SearchableSnapshotsConstants.SEARCHABLE_SNAPSHOT_FEATURE;
56+
57+
public class PutLifecycleMetadataService {
58+
59+
private static final Logger logger = LogManager.getLogger(PutLifecycleMetadataService.class);
60+
61+
private final ClusterService clusterService;
62+
private final NamedXContentRegistry xContentRegistry;
63+
private final Client client;
64+
private final XPackLicenseState licenseState;
65+
private final ThreadPool threadPool;
66+
private final ProjectResolver projectResolver;
67+
private final MasterServiceTaskQueue<UpdateLifecyclePolicyTask> taskQueue;
68+
69+
public PutLifecycleMetadataService(
70+
ClusterService clusterService,
71+
NamedXContentRegistry xContentRegistry,
72+
Client client,
73+
XPackLicenseState licenseState,
74+
ThreadPool threadPool,
75+
ProjectResolver projectResolver
76+
) {
77+
this.clusterService = clusterService;
78+
this.xContentRegistry = xContentRegistry;
79+
this.client = client;
80+
this.licenseState = licenseState;
81+
this.threadPool = threadPool;
82+
this.projectResolver = projectResolver;
83+
this.taskQueue = clusterService.createTaskQueue("ilm-put-lifecycle-queue", Priority.NORMAL, new IlmLifecycleExecutor());
84+
}
85+
86+
@SuppressForbidden(reason = "legacy usage of unbatched task") // TODO add support for batching here
87+
private void submitUnbatchedTask(@SuppressWarnings("SameParameterValue") String source, ClusterStateUpdateTask task) {
88+
clusterService.submitUnbatchedStateUpdateTask(source, task);
89+
}
90+
91+
public void addLifecycle(PutLifecycleRequest request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
92+
// headers from the thread context stored by the AuthenticationService to be shared between the
93+
// REST layer and the Transport layer here must be accessed within this thread and not in the
94+
// cluster state thread in the ClusterStateUpdateTask below since that thread does not share the
95+
// same context, and therefore does not have access to the appropriate security headers.
96+
Map<String, String> filteredHeaders = ClientHelper.getPersistableSafeSecurityHeaders(threadPool.getThreadContext(), state);
97+
98+
LifecyclePolicy.validatePolicyName(request.getPolicy().getName());
99+
request.getPolicy().maybeAddDeprecationWarningForFreezeAction(request.getPolicy().getName());
100+
ProjectMetadata projectMetadata = projectResolver.getProjectMetadata(state);
101+
{
102+
IndexLifecycleMetadata lifecycleMetadata = projectMetadata.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
103+
LifecyclePolicyMetadata existingPolicy = lifecycleMetadata.getPolicyMetadatas().get(request.getPolicy().getName());
104+
// Make the request a no-op if the policy and filtered headers match exactly
105+
if (isNoopUpdate(existingPolicy, request.getPolicy(), filteredHeaders)) {
106+
listener.onResponse(AcknowledgedResponse.TRUE);
107+
return;
108+
}
109+
}
110+
111+
UpdateLifecyclePolicyTask putTask = new UpdateLifecyclePolicyTask(
112+
projectMetadata.id(),
113+
request,
114+
listener,
115+
licenseState,
116+
filteredHeaders,
117+
xContentRegistry,
118+
client
119+
);
120+
taskQueue.submitTask("put-lifecycle-" + request.getPolicy().getName(), putTask, putTask.timeout());
121+
}
122+
123+
/**
124+
* Returns 'true' if the ILM policy is effectually the same (same policy and headers), and thus can be a no-op update.
125+
*/
126+
public static boolean isNoopUpdate(
127+
@Nullable LifecyclePolicyMetadata existingPolicy,
128+
LifecyclePolicy newPolicy,
129+
Map<String, String> filteredHeaders
130+
) {
131+
if (existingPolicy == null) {
132+
return false;
133+
} else {
134+
return newPolicy.equals(existingPolicy.getPolicy()) && filteredHeaders.equals(existingPolicy.getHeaders());
135+
}
136+
}
137+
138+
/**
139+
* Validate that the license level is compliant for searchable-snapshots, that any referenced snapshot
140+
* repositories exist, and that any referenced SLM policies exist.
141+
*
142+
* @param policy The lifecycle policy
143+
* @param state The cluster state
144+
*/
145+
private static void validatePrerequisites(LifecyclePolicy policy, ProjectState state, XPackLicenseState licenseState) {
146+
List<Phase> phasesWithSearchableSnapshotActions = policy.getPhases()
147+
.values()
148+
.stream()
149+
.filter(phase -> phase.getActions().containsKey(SearchableSnapshotAction.NAME))
150+
.toList();
151+
// check license level for searchable snapshots
152+
if (phasesWithSearchableSnapshotActions.isEmpty() == false
153+
&& SEARCHABLE_SNAPSHOT_FEATURE.checkWithoutTracking(licenseState) == false) {
154+
throw new IllegalArgumentException(
155+
"policy ["
156+
+ policy.getName()
157+
+ "] defines the ["
158+
+ SearchableSnapshotAction.NAME
159+
+ "] action but the current license is non-compliant for [searchable-snapshots]"
160+
);
161+
}
162+
// make sure any referenced snapshot repositories exist
163+
for (Phase phase : phasesWithSearchableSnapshotActions) {
164+
SearchableSnapshotAction action = (SearchableSnapshotAction) phase.getActions().get(SearchableSnapshotAction.NAME);
165+
String repository = action.getSnapshotRepository();
166+
if (RepositoriesMetadata.get(state.cluster()).repository(repository) == null) {
167+
throw new IllegalArgumentException(
168+
"no such repository ["
169+
+ repository
170+
+ "], the snapshot repository "
171+
+ "referenced by the ["
172+
+ SearchableSnapshotAction.NAME
173+
+ "] action in the ["
174+
+ phase.getName()
175+
+ "] phase "
176+
+ "must exist before it can be referenced by an ILM policy"
177+
);
178+
}
179+
}
180+
181+
List<Phase> phasesWithWaitForSnapshotActions = policy.getPhases()
182+
.values()
183+
.stream()
184+
.filter(phase -> phase.getActions().containsKey(WaitForSnapshotAction.NAME))
185+
.toList();
186+
// make sure any referenced snapshot lifecycle policies exist
187+
for (Phase phase : phasesWithWaitForSnapshotActions) {
188+
WaitForSnapshotAction action = (WaitForSnapshotAction) phase.getActions().get(WaitForSnapshotAction.NAME);
189+
String slmPolicy = action.getPolicy();
190+
if (state.metadata()
191+
.custom(SnapshotLifecycleMetadata.TYPE, SnapshotLifecycleMetadata.EMPTY)
192+
.getSnapshotConfigurations()
193+
.get(slmPolicy) == null) {
194+
throw new IllegalArgumentException(
195+
"no such snapshot lifecycle policy ["
196+
+ slmPolicy
197+
+ "], the snapshot lifecycle policy "
198+
+ "referenced by the ["
199+
+ WaitForSnapshotAction.NAME
200+
+ "] action in the ["
201+
+ phase.getName()
202+
+ "] phase "
203+
+ "must exist before it can be referenced by an ILM policy"
204+
);
205+
}
206+
}
207+
}
208+
209+
public static class UpdateLifecyclePolicyTask extends AckedClusterStateUpdateTask {
210+
private final ProjectId projectId;
211+
private final PutLifecycleRequest request;
212+
private final XPackLicenseState licenseState;
213+
private final Map<String, String> filteredHeaders;
214+
private final NamedXContentRegistry xContentRegistry;
215+
private final Client client;
216+
private final boolean verboseLogging;
217+
218+
public UpdateLifecyclePolicyTask(
219+
ProjectId projectId,
220+
PutLifecycleRequest request,
221+
ActionListener<AcknowledgedResponse> listener,
222+
XPackLicenseState licenseState,
223+
Map<String, String> filteredHeaders,
224+
NamedXContentRegistry xContentRegistry,
225+
Client client
226+
) {
227+
super(request, listener);
228+
this.projectId = projectId;
229+
this.request = request;
230+
this.licenseState = licenseState;
231+
this.filteredHeaders = filteredHeaders;
232+
this.xContentRegistry = xContentRegistry;
233+
this.client = client;
234+
this.verboseLogging = true;
235+
}
236+
237+
/**
238+
* Used by the {@link ReservedClusterStateHandler} for ILM
239+
* {@link ReservedLifecycleAction}
240+
* <p>
241+
* It disables verbose logging and has no filtered headers.
242+
*/
243+
public UpdateLifecyclePolicyTask(
244+
ProjectId projectId,
245+
PutLifecycleRequest request,
246+
XPackLicenseState licenseState,
247+
NamedXContentRegistry xContentRegistry,
248+
Client client
249+
) {
250+
super(request, null);
251+
this.projectId = projectId;
252+
this.request = request;
253+
this.licenseState = licenseState;
254+
this.filteredHeaders = Collections.emptyMap();
255+
this.xContentRegistry = xContentRegistry;
256+
this.client = client;
257+
this.verboseLogging = false;
258+
}
259+
260+
@Override
261+
public ClusterState execute(ClusterState currentState) throws Exception {
262+
var projectState = currentState.projectState(projectId);
263+
final IndexLifecycleMetadata currentMetadata = projectState.metadata()
264+
.custom(IndexLifecycleMetadata.TYPE, IndexLifecycleMetadata.EMPTY);
265+
final LifecyclePolicyMetadata existingPolicyMetadata = currentMetadata.getPolicyMetadatas().get(request.getPolicy().getName());
266+
267+
// Double-check for no-op in the state update task, in case it was changed/reset in the meantime
268+
if (isNoopUpdate(existingPolicyMetadata, request.getPolicy(), filteredHeaders)) {
269+
return currentState;
270+
}
271+
272+
validatePrerequisites(request.getPolicy(), projectState, licenseState);
273+
274+
long nextVersion = (existingPolicyMetadata == null) ? 1L : existingPolicyMetadata.getVersion() + 1L;
275+
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
276+
LifecyclePolicyMetadata lifecyclePolicyMetadata = new LifecyclePolicyMetadata(
277+
request.getPolicy(),
278+
filteredHeaders,
279+
nextVersion,
280+
Instant.now().toEpochMilli()
281+
);
282+
LifecyclePolicyMetadata oldPolicy = newPolicies.put(lifecyclePolicyMetadata.getName(), lifecyclePolicyMetadata);
283+
if (verboseLogging) {
284+
if (oldPolicy == null) {
285+
logger.info("adding index lifecycle policy [{}]", request.getPolicy().getName());
286+
} else {
287+
logger.info("updating index lifecycle policy [{}]", request.getPolicy().getName());
288+
}
289+
}
290+
IndexLifecycleMetadata newMetadata = new IndexLifecycleMetadata(newPolicies, currentILMMode(projectState.metadata()));
291+
ProjectMetadata newProjectMetadata = ProjectMetadata.builder(projectState.metadata())
292+
.putCustom(IndexLifecycleMetadata.TYPE, newMetadata)
293+
.build();
294+
ClusterState nonRefreshedState = ClusterState.builder(currentState).putProjectMetadata(newProjectMetadata).build();
295+
if (oldPolicy == null) {
296+
return nonRefreshedState;
297+
} else {
298+
try {
299+
ProjectMetadata refreshedProjectMetadata = updateIndicesForPolicy(
300+
newProjectMetadata,
301+
xContentRegistry,
302+
client,
303+
oldPolicy.getPolicy(),
304+
lifecyclePolicyMetadata,
305+
licenseState
306+
);
307+
return ClusterState.builder(currentState).putProjectMetadata(refreshedProjectMetadata).build();
308+
} catch (Exception e) {
309+
logger.warn(() -> "unable to refresh indices phase JSON for updated policy [" + oldPolicy.getName() + "]", e);
310+
// Revert to the non-refreshed state
311+
return nonRefreshedState;
312+
}
313+
}
314+
}
315+
}
316+
317+
private static class IlmLifecycleExecutor extends SimpleBatchedAckListenerTaskExecutor<UpdateLifecyclePolicyTask> {
318+
@Override
319+
public Tuple<ClusterState, ClusterStateAckListener> executeTask(UpdateLifecyclePolicyTask task, ClusterState clusterState)
320+
throws Exception {
321+
return Tuple.tuple(task.execute(clusterState), task);
322+
}
323+
}
324+
}

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/action/ReservedLifecycleAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
2323
import org.elasticsearch.xpack.core.ilm.action.DeleteLifecycleAction;
2424
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleRequest;
25+
import org.elasticsearch.xpack.ilm.PutLifecycleMetadataService;
2526

2627
import java.io.IOException;
2728
import java.util.ArrayList;
@@ -86,7 +87,7 @@ public TransformState transform(ProjectId projectId, List<LifecyclePolicy> sourc
8687
ClusterState state = prevState.state();
8788

8889
for (var request : requests) {
89-
TransportPutLifecycleAction.UpdateLifecyclePolicyTask task = new TransportPutLifecycleAction.UpdateLifecyclePolicyTask(
90+
PutLifecycleMetadataService.UpdateLifecyclePolicyTask task = new PutLifecycleMetadataService.UpdateLifecyclePolicyTask(
9091
state.metadata().getProject(projectId).id(),
9192
request,
9293
licenseState,

0 commit comments

Comments
 (0)