Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,9 @@ public void onClusterStateProcessed(ProjectState newState) {

LifecycleExecutionState exState = indexMetadata.getLifecycleExecutionState();
if (ErrorStep.NAME.equals(exState.step()) && this.failure != null) {
lifecycleRunner.registerFailedOperation(indexMetadata, failure);
lifecycleRunner.registerFailedOperation(projectId, indexMetadata, failure);
} else {
lifecycleRunner.registerSuccessfulOperation(indexMetadata);
lifecycleRunner.registerSuccessfulOperation(projectId, indexMetadata);
}

if (nextStepKey != null && nextStepKey != TerminalPolicyStep.KEY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ public Collection<?> createComponents(PluginServices services) {
new ILMHistoryStore(
new OriginSettingClient(services.client(), INDEX_LIFECYCLE_ORIGIN),
services.clusterService(),
services.threadPool()
services.threadPool(),
services.projectResolver()
)
);
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public void onResponse(Void unused) {
// Delete needs special handling, because after this step we
// will no longer have access to any information about the
// index since it will be... deleted.
registerDeleteOperation(indexMetadata);
registerDeleteOperation(projectId, indexMetadata);
}
}

Expand Down Expand Up @@ -479,7 +479,7 @@ private void moveToStep(ProjectId projectId, Index index, String policy, StepKey
),
new MoveToNextStepUpdateTask(projectId, index, policy, currentStepKey, newStepKey, nowSupplier, stepRegistry, state -> {
IndexMetadata indexMetadata = state.metadata().index(index);
registerSuccessfulOperation(indexMetadata);
registerSuccessfulOperation(projectId, indexMetadata);
if (newStepKey != null && newStepKey != TerminalPolicyStep.KEY && indexMetadata != null) {
maybeRunAsyncAction(state, indexMetadata, policy, newStepKey);
}
Expand All @@ -499,7 +499,7 @@ private void moveToErrorStep(ProjectId projectId, Index index, String policy, St
Strings.format("ilm-move-to-error-step {policy [%s], index [%s], currentStep [%s]}", policy, index.getName(), currentStepKey),
new MoveToErrorStepUpdateTask(projectId, index, policy, currentStepKey, e, nowSupplier, stepRegistry::getStep, state -> {
IndexMetadata indexMetadata = state.metadata().index(index);
registerFailedOperation(indexMetadata, e);
registerFailedOperation(projectId, indexMetadata, e);
})
);
}
Expand Down Expand Up @@ -556,13 +556,14 @@ private void markPolicyRetrievalError(
* For the given index metadata, register (index a document) that the index has transitioned
* successfully into this new state using the {@link ILMHistoryStore}
*/
void registerSuccessfulOperation(IndexMetadata indexMetadata) {
void registerSuccessfulOperation(ProjectId projectId, IndexMetadata indexMetadata) {
if (indexMetadata == null) {
// This index may have been deleted and has no metadata, so ignore it
return;
}
Long origination = calculateOriginationMillis(indexMetadata);
ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.success(
indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(),
Expand All @@ -577,12 +578,13 @@ void registerSuccessfulOperation(IndexMetadata indexMetadata) {
* For the given index metadata, register (index a document) that the index
* has been deleted by ILM using the {@link ILMHistoryStore}
*/
void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) {
void registerDeleteOperation(ProjectId projectId, IndexMetadata metadataBeforeDeletion) {
if (metadataBeforeDeletion == null) {
throw new IllegalStateException("cannot register deletion of an index that did not previously exist");
}
Long origination = calculateOriginationMillis(metadataBeforeDeletion);
ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.success(
metadataBeforeDeletion.getIndex().getName(),
metadataBeforeDeletion.getLifecyclePolicyName(),
Expand All @@ -600,13 +602,14 @@ void registerDeleteOperation(IndexMetadata metadataBeforeDeletion) {
* For the given index metadata, register (index a document) that the index has transitioned
* into the ERROR state using the {@link ILMHistoryStore}
*/
void registerFailedOperation(IndexMetadata indexMetadata, Exception failure) {
void registerFailedOperation(ProjectId projectId, IndexMetadata indexMetadata, Exception failure) {
if (indexMetadata == null) {
// This index may have been deleted and has no metadata, so ignore it
return;
}
Long origination = calculateOriginationMillis(indexMetadata);
ilmHistoryStore.putAsync(
projectId,
ILMHistoryItem.failure(
indexMetadata.getIndex().getName(),
indexMetadata.getLifecyclePolicyName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.NotMultiProjectCapable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -66,27 +69,25 @@ public class ILMHistoryStore implements Closeable {
);

private volatile boolean ilmHistoryEnabled = true;
private final ProjectResolver projectResolver;
private final BulkProcessor2 processor;

public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool) {
this(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueSeconds(5));
public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool threadPool, ProjectResolver projectResolver) {
this(client, clusterService, threadPool, projectResolver, ActionListener.noop(), TimeValue.timeValueSeconds(5));
}

/**
* For unit testing, allows a more frequent flushInterval
* @param client
* @param clusterService
* @param threadPool
* @param listener
* @param flushInterval
* For unit testing, allows a more frequent flushInterval
*/
ILMHistoryStore(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ProjectResolver projectResolver,
ActionListener<BulkResponse> listener,
TimeValue flushInterval
) {
this.projectResolver = projectResolver;
this.setIlmHistoryEnabled(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING.get(clusterService.getSettings()));
clusterService.getClusterSettings().addSettingsUpdateConsumer(LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING, this::setIlmHistoryEnabled);

Expand All @@ -95,7 +96,8 @@ public ILMHistoryStore(Client client, ClusterService clusterService, ThreadPool
new BulkProcessor2.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
if (clusterService.state().getMetadata().getProject().templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
final var project = projectResolver.getProjectMetadata(clusterService.state());
if (project.templatesV2().containsKey(ILM_TEMPLATE_NAME) == false) {
ElasticsearchException e = new ElasticsearchException("no ILM history template");
logger.warn(
() -> format(
Expand Down Expand Up @@ -169,7 +171,8 @@ public void afterBulk(long executionId, BulkRequest request, Exception failure)
/**
* Attempts to asynchronously index an ILM history entry
*/
public void putAsync(ILMHistoryItem item) {
@NotMultiProjectCapable(description = "See comment inside method")
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
if (ilmHistoryEnabled == false) {
logger.trace(
"not recording ILM history item because [{}] is [false]: [{}]",
Expand All @@ -182,7 +185,10 @@ public void putAsync(ILMHistoryItem item) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
item.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest request = new IndexRequest(ILM_HISTORY_DATA_STREAM).source(builder).opType(DocWriteRequest.OpType.CREATE);
processor.add(request);
// Even though this looks project-aware, it's not really. The bulk processor flushes the history items at "arbitrary" moments,
// meaning it will send bulk requests with history items of multiple projects, but the _bulk API will index everything into
// the project of the last history item that came in.
Comment on lines +188 to +190
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I plan to add an annotation and use it here before I merge this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the same conversation as on the other PR about assertions applies here?

projectResolver.executeOnProject(projectId, () -> processor.add(request));
} catch (Exception e) {
logger.error(() -> format("failed to send ILM history item to index [%s]: [%s]", ILM_HISTORY_DATA_STREAM, item), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -1266,15 +1268,15 @@ private static class NoOpHistoryStore extends ILMHistoryStore {
private final List<ILMHistoryItem> items = new CopyOnWriteArrayList<>();

NoOpHistoryStore(Client noopClient, ClusterService clusterService) {
super(noopClient, clusterService, clusterService.threadPool());
super(noopClient, clusterService, clusterService.threadPool(), TestProjectResolvers.alwaysThrow());
}

public List<ILMHistoryItem> getItems() {
return items;
}

@Override
public void putAsync(ILMHistoryItem item) {
public void putAsync(ProjectId projectId, ILMHistoryItem item) {
logger.info("--> adding ILM history item: [{}]", item);
items.add(item);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.LifecycleExecutionState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.TriFunction;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -65,6 +68,7 @@ public class ILMHistoryStoreTests extends ESTestCase {
private VerifyingClient client;
private ClusterService clusterService;
private ILMHistoryStore historyStore;
private ProjectId projectId;

@Before
public void setup() {
Expand All @@ -83,13 +87,21 @@ public void setup() {
NamedXContentRegistry.EMPTY
);
ClusterState state = clusterService.state();
projectId = randomProjectIdOrDefault();
ClusterServiceUtils.setState(
clusterService,
ClusterState.builder(state)
.metadata(Metadata.builder(state.metadata()).indexTemplates(registry.getComposableTemplateConfigs()))
.putProjectMetadata(ProjectMetadata.builder(projectId).indexTemplates(registry.getComposableTemplateConfigs()))
.build()
);
historyStore = new ILMHistoryStore(client, clusterService, threadPool, ActionListener.noop(), TimeValue.timeValueMillis(500));
historyStore = new ILMHistoryStore(
client,
clusterService,
threadPool,
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
ActionListener.noop(),
TimeValue.timeValueMillis(500)
);
}

@After
Expand All @@ -115,7 +127,7 @@ public void testNoActionIfDisabled() throws Exception {
latch.countDown();
return null;
});
historyStore.putAsync(record);
historyStore.putAsync(projectId, record);
assertFalse(latch.await(2, TimeUnit.SECONDS));
}

Expand Down Expand Up @@ -156,7 +168,7 @@ public void testPut() throws Exception {
);
});

historyStore.putAsync(record);
historyStore.putAsync(projectId, record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}

Expand Down Expand Up @@ -207,7 +219,7 @@ public void testPut() throws Exception {
);
});

historyStore.putAsync(record);
historyStore.putAsync(projectId, record);
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}
}
Expand Down Expand Up @@ -255,23 +267,32 @@ public void testMultipleFlushes() throws Exception {
);
return bulkItemResponse;
});
try (ILMHistoryStore localHistoryStore = new ILMHistoryStore(client, clusterService, threadPool, new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
int itemsInResponse = response.getItems().length;
actions.addAndGet(itemsInResponse);
for (int i = 0; i < itemsInResponse; i++) {
latch.countDown();
}
logger.info("cumulative responses: {}", actions.get());
}
try (
ILMHistoryStore localHistoryStore = new ILMHistoryStore(
client,
clusterService,
threadPool,
TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()),
new ActionListener<>() {
@Override
public void onResponse(BulkResponse response) {
int itemsInResponse = response.getItems().length;
actions.addAndGet(itemsInResponse);
for (int i = 0; i < itemsInResponse; i++) {
latch.countDown();
}
logger.info("cumulative responses: {}", actions.get());
}

@Override
public void onFailure(Exception e) {
logger.error(e);
fail(e.getMessage());
}
}, TimeValue.timeValueMillis(randomIntBetween(50, 1000)))) {
@Override
public void onFailure(Exception e) {
logger.error(e);
fail(e.getMessage());
}
},
TimeValue.timeValueMillis(randomIntBetween(50, 1000))
)
) {
for (int i = 0; i < numberOfDocs; i++) {
ILMHistoryItem record1 = ILMHistoryItem.success(
"index",
Expand All @@ -280,7 +301,7 @@ public void onFailure(Exception e) {
10L,
LifecycleExecutionState.builder().setPhase("phase").build()
);
localHistoryStore.putAsync(record1);
localHistoryStore.putAsync(projectId, record1);
}
latch.await(5, TimeUnit.SECONDS);
assertThat(actions.get(), equalTo(numberOfDocs));
Expand Down