Skip to content

Commit e955872

Browse files
authored
Make ILMHistoryStore semi-project-aware (#129933)
Ensures the history store is able to index documents into a non-default project, but it doesn't actually work with multiple concurrent projects.
1 parent 0299f4e commit e955872

File tree

6 files changed

+77
-44
lines changed

6 files changed

+77
-44
lines changed

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/ExecuteStepsUpdateTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,9 @@ public void onClusterStateProcessed(ProjectState newState) {
221221

222222
LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
223223
if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) {
224-
lifecycleRunner.registerFailedOperation(indexMetadata, failure);
224+
lifecycleRunner.registerFailedOperation(projectId, indexMetadata, failure);
225225
} else {
226-
lifecycleRunner.registerSuccessfulOperation(indexMetadata);
226+
lifecycleRunner.registerSuccessfulOperation(projectId, indexMetadata);
227227
}
228228

229229
if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycle.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,8 @@ public Collection<?> createComponents(PluginServices services) {
153153
new ILMHistoryStore(
154154
new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN),
155155
services.clusterService(),
156-
services.threadPool()
156+
services.threadPool(),
157+
services.projectResolver()
157158
)
158159
);
159160
/*

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunner.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ public void onResponse(Void unused) {
371371
// Delete needs special handling, because after this step we
372372
// will no longer have access to any information about the
373373
// index since it will be... deleted.
374-
registerDeleteOperation(indexMetadata);
374+
registerDeleteOperation(projectId, indexMetadata);
375375
}
376376
}
377377

@@ -479,7 +479,7 @@ private void moveToStep(ProjectId projectId, Index index, String policy, StepKey
479479
),
480480
new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
481481
IndexMetadata indexMetadata = state.metadata().index(index);
482-
registerSuccessfulOperation(indexMetadata);
482+
registerSuccessfulOperation(projectId, indexMetadata);
483483
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
484484
maybeRunAsyncAction(state, indexMetadata, policy, newStepKey);
485485
}
@@ -499,7 +499,7 @@ private void moveToErrorStep(ProjectId projectId, Index index, String policy, St
499499
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
500500
new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
501501
IndexMetadata indexMetadata = state.metadata().index(index);
502-
registerFailedOperation(indexMetadata, e);
502+
registerFailedOperation(projectId, indexMetadata, e);
503503
})
504504
);
505505
}
@@ -556,13 +556,14 @@ private void markPolicyRetrievalError(
556556
* For the given index metadata, register (index a document) that the index has transitioned
557557
* successfully into this new state using the {@link ILMHistoryStore}
558558
*/
559-
void registerSuccessfulOperation(IndexMetadata indexMetadata) {
559+
void registerSuccessfulOperation(ProjectId projectId, IndexMetadata indexMetadata) {
560560
if (indexMetadata == null) {
561561
// This index may have been deleted and has no metadata, so ignore it
562562
return;
563563
}
564564
Long origination = calculateOriginationMillis(indexMetadata);
565565
ilmHistoryStore.putAsync(
566+
projectId,
566567
ILMHistoryItem.success(
567568
indexMetadata.getIndex().getName(),
568569
indexMetadata.getLifecyclePolicyName(),
@@ -577,12 +578,13 @@ void registerSuccessfulOperation(IndexMetadata indexMetadata) {
577578
* For the given index metadata, register (index a document) that the index
578579
* has been deleted by ILM using the {@link ILMHistoryStore}
579580
*/
580-
void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) {
581+
void registerDeleteOperation(ProjectId projectId, IndexMetadata metadataBeforeDeletion) {
581582
if (metadataBeforeDeletion == null) {
582583
throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
583584
}
584585
Long origination = calculateOriginationMillis(metadataBeforeDeletion);
585586
ilmHistoryStore.putAsync(
587+
projectId,
586588
ILMHistoryItem.success(
587589
metadataBeforeDeletion.getIndex().getName(),
588590
metadataBeforeDeletion.getLifecyclePolicyName(),
@@ -600,13 +602,14 @@ void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) {
600602
* For the given index metadata, register (index a document) that the index has transitioned
601603
* into the ERROR state using the {@link ILMHistoryStore}
602604
*/
603-
void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
605+
void registerFailedOperation(ProjectId projectId, IndexMetadata indexMetadata, Exception failure) {
604606
if (indexMetadata == null) {
605607
// This index may have been deleted and has no metadata, so ignore it
606608
return;
607609
}
608610
Long origination = calculateOriginationMillis(indexMetadata);
609611
ilmHistoryStore.putAsync(
612+
projectId,
610613
ILMHistoryItem.failure(
611614
indexMetadata.getIndex().getName(),
612615
indexMetadata.getLifecyclePolicyName(),

x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStore.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
import org.elasticsearch.action.index.IndexRequest;
2020
import org.elasticsearch.client.internal.Client;
2121
import org.elasticsearch.client.internal.OriginSettingClient;
22+
import org.elasticsearch.cluster.metadata.ProjectId;
23+
import org.elasticsearch.cluster.project.ProjectResolver;
2224
import org.elasticsearch.cluster.service.ClusterService;
2325
import org.elasticsearch.common.settings.Setting;
2426
import org.elasticsearch.common.unit.ByteSizeValue;
27+
import org.elasticsearch.core.NotMultiProjectCapable;
2528
import org.elasticsearch.core.TimeValue;
2629
import org.elasticsearch.threadpool.ThreadPool;
2730
import org.elasticsearch.xcontent.ToXContent;
@@ -66,27 +69,25 @@ public class ILMHistoryStore implements Closeable {
6669
);
6770

6871
private volatile boolean ilmHistoryEnabled = true;
72+
private final ProjectResolver projectResolver;
6973
private final BulkProcessor2 processor;
7074

71-
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool) {
72-
this(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueSeconds(5));
75+
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool, ProjectResolver projectResolver) {
76+
this(client, clusterService, threadPool, projectResolver, ActionListener.noop(), TimeValue.timeValueSeconds(5));
7377
}
7478

7579
/**
76-
* For unit testing, allows a more frequent flushInterval
77-
* @param client
78-
* @param clusterService
79-
* @param threadPool
80-
* @param listener
81-
* @param flushInterval
80+
* For unit testing, allows a more frequent flushInterval
8281
*/
8382
ILMHistoryStore(
8483
Client client,
8584
ClusterService clusterService,
8685
ThreadPool threadPool,
86+
ProjectResolver projectResolver,
8787
ActionListener<BulkResponse> listener,
8888
TimeValue flushInterval
8989
) {
90+
this.projectResolver = projectResolver;
9091
this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings()));
9192
clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled);
9293

@@ -95,7 +96,8 @@ public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool
9596
new BulkProcessor2.Listener() {
9697
@Override
9798
public void beforeBulk(long executionId, BulkRequest request) {
98-
if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
99+
final var project = projectResolver.getProjectMetadata(clusterService.state());
100+
if (project.templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
99101
ElasticsearchException e = new ElasticsearchException("no ILM history template");
100102
logger.warn(
101103
() -> format(
@@ -169,7 +171,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
169171
/**
170172
* Attempts to asynchronously index an ILM history entry
171173
*/
172-
public void putAsync(ILMHistoryItem item) {
174+
@NotMultiProjectCapable(description = "See comment inside method")
175+
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
173176
if (ilmHistoryEnabled == false) {
174177
logger.trace(
175178
"not recording ILM history item because [{}] is [false]: [{}]",
@@ -182,7 +185,10 @@ public void putAsync(ILMHistoryItem item) {
182185
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
183186
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
184187
IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE);
185-
processor.add(request);
188+
// Even though this looks project-aware, it's not really. The bulk processor flushes the history items at "arbitrary" moments,
189+
// meaning it will send bulk requests with history items of multiple projects, but the _bulk API will index everything into
190+
// the project of the last history item that came in.
191+
projectResolver.executeOnProject(projectId, () -> processor.add(request));
186192
} catch (Exception e) {
187193
logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e);
188194
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/IndexLifecycleRunnerTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import org.elasticsearch.cluster.ProjectState;
1717
import org.elasticsearch.cluster.metadata.IndexMetadata;
1818
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
19+
import org.elasticsearch.cluster.metadata.ProjectId;
1920
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2021
import org.elasticsearch.cluster.node.DiscoveryNode;
2122
import org.elasticsearch.cluster.node.DiscoveryNodes;
23+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2224
import org.elasticsearch.cluster.service.ClusterService;
2325
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
2426
import org.elasticsearch.common.Priority;
@@ -1266,15 +1268,15 @@ private static class NoOpHistoryStore extends ILMHistoryStore {
12661268
private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>();
12671269

12681270
NoOpHistoryStore(Client noopClient, ClusterService clusterService) {
1269-
super(noopClient, clusterService, clusterService.threadPool());
1271+
super(noopClient, clusterService, clusterService.threadPool(), TestProjectResolvers.alwaysThrow());
12701272
}
12711273

12721274
public List<ILMHistoryItem> getItems() {
12731275
return items;
12741276
}
12751277

12761278
@Override
1277-
public void putAsync(ILMHistoryItem item) {
1279+
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
12781280
logger.info("--> adding ILM history item: [{}]", item);
12791281
items.add(item);
12801282
}

x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/ilm/history/ILMHistoryStoreTests.java

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.elasticsearch.cluster.ClusterState;
2626
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
2727
import org.elasticsearch.cluster.metadata.Metadata;
28+
import org.elasticsearch.cluster.metadata.ProjectId;
29+
import org.elasticsearch.cluster.metadata.ProjectMetadata;
30+
import org.elasticsearch.cluster.project.TestProjectResolvers;
2831
import org.elasticsearch.cluster.service.ClusterService;
2932
import org.elasticsearch.common.TriFunction;
3033
import org.elasticsearch.common.settings.ClusterSettings;
@@ -65,6 +68,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
6568
private VerifyingClient client;
6669
private ClusterService clusterService;
6770
private ILMHistoryStore historyStore;
71+
private ProjectId projectId;
6872

6973
@Before
7074
public void setup() {
@@ -83,13 +87,21 @@ public void setup() {
8387
NamedXContentRegistry.EMPTY
8488
);
8589
ClusterState state = clusterService.state();
90+
projectId = randomProjectIdOrDefault();
8691
ClusterServiceUtils.setState(
8792
clusterService,
8893
ClusterState.builder(state)
89-
.metadata(Metadata.builder(state.metadata()).indexTemplates(registry.getComposableTemplateConfigs()))
94+
.putProjectMetadata(ProjectMetadata.builder(projectId).indexTemplates(registry.getComposableTemplateConfigs()))
9095
.build()
9196
);
92-
historyStore = new ILMHistoryStore(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueMillis(500));
97+
historyStore = new ILMHistoryStore(
98+
client,
99+
clusterService,
100+
threadPool,
101+
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
102+
ActionListener.noop(),
103+
TimeValue.timeValueMillis(500)
104+
);
93105
}
94106

95107
@After
@@ -115,7 +127,7 @@ public void testNoActionIfDisabled() throws Exception {
115127
latch.countDown();
116128
return null;
117129
});
118-
historyStore.putAsync(record);
130+
historyStore.putAsync(projectId, record);
119131
assertFalse(latch.await(2, TimeUnit.SECONDS));
120132
}
121133

@@ -156,7 +168,7 @@ public void testPut() throws Exception {
156168
);
157169
});
158170

159-
historyStore.putAsync(record);
171+
historyStore.putAsync(projectId, record);
160172
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
161173
}
162174

@@ -207,7 +219,7 @@ public void testPut() throws Exception {
207219
);
208220
});
209221

210-
historyStore.putAsync(record);
222+
historyStore.putAsync(projectId, record);
211223
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
212224
}
213225
}
@@ -255,23 +267,32 @@ public void testMultipleFlushes() throws Exception {
255267
);
256268
return bulkItemResponse;
257269
});
258-
try (ILMHistoryStore localHistoryStore = new ILMHistoryStore(client, clusterService, threadPool, new ActionListener<>() {
259-
@Override
260-
public void onResponse(BulkResponse response) {
261-
int itemsInResponse = response.getItems().length;
262-
actions.addAndGet(itemsInResponse);
263-
for (int i = 0; i < itemsInResponse; i++) {
264-
latch.countDown();
265-
}
266-
logger.info("cumulative responses: {}", actions.get());
267-
}
270+
try (
271+
ILMHistoryStore localHistoryStore = new ILMHistoryStore(
272+
client,
273+
clusterService,
274+
threadPool,
275+
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
276+
new ActionListener<>() {
277+
@Override
278+
public void onResponse(BulkResponse response) {
279+
int itemsInResponse = response.getItems().length;
280+
actions.addAndGet(itemsInResponse);
281+
for (int i = 0; i < itemsInResponse; i++) {
282+
latch.countDown();
283+
}
284+
logger.info("cumulative responses: {}", actions.get());
285+
}
268286

269-
@Override
270-
public void onFailure(Exception e) {
271-
logger.error(e);
272-
fail(e.getMessage());
273-
}
274-
}, TimeValue.timeValueMillis(randomIntBetween(50, 1000)))) {
287+
@Override
288+
public void onFailure(Exception e) {
289+
logger.error(e);
290+
fail(e.getMessage());
291+
}
292+
},
293+
TimeValue.timeValueMillis(randomIntBetween(50, 1000))
294+
)
295+
) {
275296
for (int i = 0; i < numberOfDocs; i++) {
276297
ILMHistoryItem record1 = ILMHistoryItem.success(
277298
"index",
@@ -280,7 +301,7 @@ public void onFailure(Exception e) {
280301
10L,
281302
LifecycleExecutionState.builder().setPhase("phase").build()
282303
);
283-
localHistoryStore.putAsync(record1);
304+
localHistoryStore.putAsync(projectId, record1);
284305
}
285306
latch.await(5, TimeUnit.SECONDS);
286307
assertThat(actions.get(), equalTo(numberOfDocs));

0 commit comments

Comments
 (0)