diff --git a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java index c987f0a5570c5..b2b5563059930 100644 --- a/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java +++ b/modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.DataStreamTestHelper; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -594,7 +595,7 @@ public void testErrorRecordingOnRollover() throws Exception { Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - writeIndexRolloverError = lifecycleService.getErrorStore().getError(writeIndexName); + writeIndexRolloverError = lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, writeIndexName); if (writeIndexRolloverError != null) { break; } @@ -671,7 +672,7 @@ public void testErrorRecordingOnRollover() throws Exception { Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - assertThat(lifecycleService.getErrorStore().getError(previousWriteInddex), nullValue()); + assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, previousWriteInddex), nullValue()); } }); @@ -768,7 +769,8 @@ public void testErrorRecordingOnRetention() throws Exception { Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - recordedRetentionExecutionError = lifecycleService.getErrorStore().getError(firstGenerationIndex); + recordedRetentionExecutionError = lifecycleService.getErrorStore() + .getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex); if (recordedRetentionExecutionError != null && recordedRetentionExecutionError.retryCount() > 3) { break; } @@ -832,7 +834,7 @@ public void testErrorRecordingOnRetention() throws Exception { // error stores don't contain anything for the first generation index anymore Iterable lifecycleServices = internalCluster().getInstances(DataStreamLifecycleService.class); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { - assertThat(lifecycleService.getErrorStore().getError(firstGenerationIndex), nullValue()); + assertThat(lifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenerationIndex), nullValue()); } }); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java index 64afe1840d9af..236f7fbde61af 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStore.java @@ -11,6 +11,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.common.Strings; import org.elasticsearch.core.Nullable; import org.elasticsearch.health.node.DslErrorInfo; @@ -34,7 +35,7 @@ public class DataStreamLifecycleErrorStore { public static final int MAX_ERROR_MESSAGE_LENGTH = 1000; - private final ConcurrentMap indexNameToError = new ConcurrentHashMap<>(); + private final ConcurrentMap> projectMap = new ConcurrentHashMap<>(); private final LongSupplier nowSupplier; public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) { @@ -48,12 +49,13 @@ public DataStreamLifecycleErrorStore(LongSupplier nowSupplier) { * Returns the previously recorded error for the provided index, or null otherwise. */ @Nullable - public ErrorEntry recordError(String indexName, Exception e) { + public ErrorEntry recordError(ProjectId projectId, String indexName, Exception e) { String exceptionToString = Strings.toString((builder, params) -> { ElasticsearchException.generateThrowableXContent(builder, EMPTY_PARAMS, e); return builder; }); String newError = Strings.substring(exceptionToString, 0, MAX_ERROR_MESSAGE_LENGTH); + final var indexNameToError = projectMap.computeIfAbsent(projectId, k -> new ConcurrentHashMap<>()); ErrorEntry existingError = indexNameToError.get(indexName); long recordedTimestamp = nowSupplier.getAsLong(); if (existingError == null) { @@ -71,7 +73,11 @@ public ErrorEntry recordError(String indexName, Exception e) { /** * Clears the recorded error for the provided index (if any exists) */ - public void clearRecordedError(String indexName) { + public void clearRecordedError(ProjectId projectId, String indexName) { + final var indexNameToError = projectMap.get(projectId); + if (indexNameToError == null) { + return; + } indexNameToError.remove(indexName); } @@ -79,21 +85,29 @@ public void clearRecordedError(String indexName) { * Clears all the errors recorded in the store. */ public void clearStore() { - indexNameToError.clear(); + projectMap.clear(); } /** * Retrieves the recorded error for the provided index. */ @Nullable - public ErrorEntry getError(String indexName) { + public ErrorEntry getError(ProjectId projectId, String indexName) { + final var indexNameToError = projectMap.get(projectId); + if (indexNameToError == null) { + return null; + } return indexNameToError.get(indexName); } /** * Return an immutable view (a snapshot) of the tracked indices at the moment this method is called. */ - public Set getAllIndices() { + public Set getAllIndices(ProjectId projectId) { + final var indexNameToError = projectMap.get(projectId); + if (indexNameToError == null) { + return Set.of(); + } return Set.copyOf(indexNameToError.keySet()); } @@ -103,8 +117,9 @@ public Set getAllIndices() { * retries DSL attempted (descending order) and the number of entries will be limited according to the provided limit parameter. * Returns empty list if no entries are present in the error store or none satisfy the predicate. */ - public List getErrorsInfo(Predicate errorEntryPredicate, int limit) { - if (indexNameToError.isEmpty()) { + public List getErrorsInfo(ProjectId projectId, Predicate errorEntryPredicate, int limit) { + final var indexNameToError = projectMap.get(projectId); + if (indexNameToError == null || indexNameToError.isEmpty()) { return List.of(); } return indexNameToError.entrySet() diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index a41ed15f79d35..29d5d13dea346 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -784,7 +784,8 @@ static List getTargetIndices( */ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { Metadata metadata = clusterService.state().metadata(); - for (String indexName : errorStore.getAllIndices()) { + final var projectId = metadata.getProject().id(); + for (String indexName : errorStore.getAllIndices(projectId)) { IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(indexName); DataStream parentDataStream = indexAbstraction != null ? indexAbstraction.getParentDataStream() : null; if (indexAbstraction == null || parentDataStream == null) { @@ -792,13 +793,13 @@ private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { "Clearing recorded error for index [{}] because the index doesn't exist or is not a data stream backing index anymore", indexName ); - errorStore.clearRecordedError(indexName); + errorStore.clearRecordedError(projectId, indexName); } else if (parentDataStream.getName().equals(dataStream.getName())) { // we're only verifying the indices that pertain to this data stream IndexMetadata indexMeta = metadata.getProject().index(indexName); if (dataStream.isIndexManagedByDataStreamLifecycle(indexMeta.getIndex(), metadata.getProject()::index) == false) { logger.trace("Clearing recorded error for index [{}] because the index is not managed by DSL anymore", indexName); - errorStore.clearRecordedError(indexName); + errorStore.clearRecordedError(projectId, indexName); } } } @@ -866,7 +867,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo if (latestDataStream.getWriteIndex().getName().equals(currentRunWriteIndex.getName())) { // data stream has not been rolled over in the meantime so record the error against the write index we // attempted the rollover - errorStore.recordError(currentRunWriteIndex.getName(), e); + errorStore.recordError(clusterService.state().metadata().getProject().id(), currentRunWriteIndex.getName(), e); } } } @@ -1074,7 +1075,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { // index was already deleted, treat this as a success logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex); - errorStore.clearRecordedError(targetIndex); + errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); listener.onResponse(null); return; } @@ -1157,7 +1158,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { // index was already deleted, treat this as a success logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex); - errorStore.clearRecordedError(targetIndex); + errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); listener.onResponse(null); return; } @@ -1193,7 +1194,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { logger.trace("Data stream lifecycle did not delete index [{}] as it was already deleted", targetIndex); // index was already deleted, treat this as a success - errorStore.clearRecordedError(targetIndex); + errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); listener.onResponse(null); return; } @@ -1341,7 +1342,9 @@ static class ErrorRecordingActionListener implements ActionListener { @Override public void onResponse(Void unused) { logger.trace("Clearing recorded error for index [{}] because the [{}] action was successful", targetIndex, actionName); - errorStore.clearRecordedError(targetIndex); + @FixForMultiProject(description = "Don't use default project ID") + final var projectId = Metadata.DEFAULT_PROJECT_ID; + errorStore.clearRecordedError(projectId, targetIndex); } @Override @@ -1364,8 +1367,10 @@ static void recordAndLogError( String logMessage, int signallingErrorRetryThreshold ) { - ErrorEntry previousError = errorStore.recordError(targetIndex, e); - ErrorEntry currentError = errorStore.getError(targetIndex); + @FixForMultiProject(description = "Don't use default project ID") + final var projectId = Metadata.DEFAULT_PROJECT_ID; + ErrorEntry previousError = errorStore.recordError(projectId, targetIndex, e); + ErrorEntry currentError = errorStore.getError(projectId, targetIndex); if (previousError == null || (currentError != null && previousError.error().equals(currentError.error()) == false)) { logger.error(logMessage, e); } else { diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java index f5bcb4d6462e4..db7c70c127bb0 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportExplainDataStreamLifecycleAction.java @@ -113,7 +113,7 @@ protected void masterOperation( rolloverInfo == null ? null : rolloverInfo.getTime(), generationDate, parentDataStream.getLifecycle(), - errorStore.getError(index) + errorStore.getError(state.projectId(), index) ); explainIndices.add(explainIndexDataStreamLifecycle); } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java index 52ffdba26e0f1..bc34d0cfed3b9 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsAction.java @@ -10,12 +10,13 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; -import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.action.support.master.TransportMasterNodeReadProjectAction; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; @@ -33,7 +34,7 @@ /** * Exposes stats about the latest lifecycle run and the error store. */ -public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadAction< +public class TransportGetDataStreamLifecycleStatsAction extends TransportMasterNodeReadProjectAction< GetDataStreamLifecycleStatsAction.Request, GetDataStreamLifecycleStatsAction.Response> { @@ -45,7 +46,8 @@ public TransportGetDataStreamLifecycleStatsAction( ClusterService clusterService, ThreadPool threadPool, ActionFilters actionFilters, - DataStreamLifecycleService lifecycleService + DataStreamLifecycleService lifecycleService, + ProjectResolver projectResolver ) { super( GetDataStreamLifecycleStatsAction.NAME, @@ -54,6 +56,7 @@ public TransportGetDataStreamLifecycleStatsAction( threadPool, actionFilters, GetDataStreamLifecycleStatsAction.Request::new, + projectResolver, GetDataStreamLifecycleStatsAction.Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE ); @@ -64,23 +67,22 @@ public TransportGetDataStreamLifecycleStatsAction( protected void masterOperation( Task task, GetDataStreamLifecycleStatsAction.Request request, - ClusterState state, + ProjectState state, ActionListener listener ) throws Exception { - listener.onResponse(collectStats(state)); + listener.onResponse(collectStats(state.metadata())); } // Visible for testing - GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) { - Metadata metadata = state.metadata(); - Set indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(); + GetDataStreamLifecycleStatsAction.Response collectStats(ProjectMetadata project) { + Set indicesInErrorStore = lifecycleService.getErrorStore().getAllIndices(project.id()); List dataStreamStats = new ArrayList<>(); - for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) { + for (DataStream dataStream : project.dataStreams().values()) { if (dataStream.getLifecycle() != null && dataStream.getLifecycle().enabled()) { int total = 0; int inError = 0; for (Index index : dataStream.getIndices()) { - if (dataStream.isIndexManagedByDataStreamLifecycle(index, metadata.getProject()::index)) { + if (dataStream.isIndexManagedByDataStreamLifecycle(index, project::index)) { total++; if (indicesInErrorStore.contains(index.getName())) { inError++; @@ -102,7 +104,7 @@ GetDataStreamLifecycleStatsAction.Response collectStats(ClusterState state) { } @Override - protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ClusterState state) { + protected ClusterBlockException checkBlock(GetDataStreamLifecycleStatsAction.Request request, ProjectState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java index 71575ee88aa7d..ef7842ea678b5 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisher.java @@ -14,10 +14,12 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; import org.elasticsearch.health.node.DslErrorInfo; @@ -83,9 +85,12 @@ private void updateNumberOfErrorsToPublish(int newValue) { * {@link org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService#DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING} */ public void publishDslErrorEntries(ActionListener actionListener) { + @FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID") + final var projectId = Metadata.DEFAULT_PROJECT_ID; // fetching the entries that persist in the error store for more than the signalling retry interval // note that we're reporting this view into the error store on every publishing iteration List errorEntriesToSignal = errorStore.getErrorsInfo( + projectId, entry -> entry.retryCount() >= signallingErrorRetryInterval, maxNumberOfErrorsToPublish ); @@ -97,7 +102,7 @@ public void publishDslErrorEntries(ActionListener actionLi UpdateHealthInfoCacheAction.INSTANCE, new UpdateHealthInfoCacheAction.Request( healthNodeId, - new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices().size()) + new DataStreamLifecycleHealthInfo(errorEntriesToSignal, errorStore.getAllIndices(projectId).size()) ), actionListener ); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java index c33bd8482710b..defbb0ba02b4b 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleErrorStoreTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.datastreams.lifecycle; import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.health.node.DslErrorInfo; import org.elasticsearch.test.ESTestCase; import org.junit.Before; @@ -30,33 +31,37 @@ public class DataStreamLifecycleErrorStoreTests extends ESTestCase { private DataStreamLifecycleErrorStore errorStore; + private ProjectId projectId; @Before public void setupServices() { errorStore = new DataStreamLifecycleErrorStore(System::currentTimeMillis); + projectId = randomProjectIdOrDefault(); } public void testRecordAndRetrieveError() { - ErrorEntry existingRecordedError = errorStore.recordError("test", new NullPointerException("testing")); + ErrorEntry existingRecordedError = errorStore.recordError(projectId, "test", new NullPointerException("testing")); assertThat(existingRecordedError, is(nullValue())); - assertThat(errorStore.getError("test"), is(notNullValue())); - assertThat(errorStore.getAllIndices().size(), is(1)); - assertThat(errorStore.getAllIndices(), hasItem("test")); + assertThat(errorStore.getError(projectId, "test"), is(notNullValue())); + assertThat(errorStore.getAllIndices(projectId).size(), is(1)); + assertThat(errorStore.getAllIndices(projectId), hasItem("test")); - existingRecordedError = errorStore.recordError("test", new IllegalStateException("bad state")); + existingRecordedError = errorStore.recordError(projectId, "test", new IllegalStateException("bad state")); assertThat(existingRecordedError, is(notNullValue())); assertThat(existingRecordedError.error(), containsString("testing")); } public void testRetrieveAfterClear() { - errorStore.recordError("test", new NullPointerException("testing")); + errorStore.recordError(projectId, "test", new NullPointerException("testing")); errorStore.clearStore(); - assertThat(errorStore.getError("test"), is(nullValue())); + assertThat(errorStore.getError(projectId, "test"), is(nullValue())); } public void testGetAllIndicesIsASnapshotViewOfTheStore() { - Stream.iterate(0, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing"))); - Set initialAllIndices = errorStore.getAllIndices(); + Stream.iterate(0, i -> i + 1) + .limit(5) + .forEach(i -> errorStore.recordError(projectId, "test" + i, new NullPointerException("testing"))); + Set initialAllIndices = errorStore.getAllIndices(projectId); assertThat(initialAllIndices.size(), is(5)); assertThat( initialAllIndices, @@ -64,9 +69,11 @@ public void testGetAllIndicesIsASnapshotViewOfTheStore() { ); // let's add some more items to the store and clear a couple of the initial ones - Stream.iterate(5, i -> i + 1).limit(5).forEach(i -> errorStore.recordError("test" + i, new NullPointerException("testing"))); - errorStore.clearRecordedError("test0"); - errorStore.clearRecordedError("test1"); + Stream.iterate(5, i -> i + 1) + .limit(5) + .forEach(i -> errorStore.recordError(projectId, "test" + i, new NullPointerException("testing"))); + errorStore.clearRecordedError(projectId, "test0"); + errorStore.clearRecordedError(projectId, "test1"); // the initial list should remain unchanged assertThat(initialAllIndices.size(), is(5)); assertThat( @@ -75,42 +82,42 @@ public void testGetAllIndicesIsASnapshotViewOfTheStore() { ); // calling getAllIndices again should reflect the latest state - assertThat(errorStore.getAllIndices().size(), is(8)); + assertThat(errorStore.getAllIndices(projectId).size(), is(8)); assertThat( - errorStore.getAllIndices(), + errorStore.getAllIndices(projectId), containsInAnyOrder(Stream.iterate(2, i -> i + 1).limit(8).map(i -> "test" + i).toArray(String[]::new)) ); } public void testRecordedErrorIsMaxOneThousandChars() { NullPointerException exceptionWithLongMessage = new NullPointerException(randomAlphaOfLength(2000)); - errorStore.recordError("test", exceptionWithLongMessage); - assertThat(errorStore.getError("test"), is(notNullValue())); - assertThat(errorStore.getError("test").error().length(), is(MAX_ERROR_MESSAGE_LENGTH)); + errorStore.recordError(projectId, "test", exceptionWithLongMessage); + assertThat(errorStore.getError(projectId, "test"), is(notNullValue())); + assertThat(errorStore.getError(projectId, "test").error().length(), is(MAX_ERROR_MESSAGE_LENGTH)); } public void testGetFilteredEntries() { - IntStream.range(0, 20).forEach(i -> errorStore.recordError("test20", new NullPointerException("testing"))); - IntStream.range(0, 5).forEach(i -> errorStore.recordError("test5", new NullPointerException("testing"))); + IntStream.range(0, 20).forEach(i -> errorStore.recordError(projectId, "test20", new NullPointerException("testing"))); + IntStream.range(0, 5).forEach(i -> errorStore.recordError(projectId, "test5", new NullPointerException("testing"))); { - List entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 100); + List entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 7, 100); assertThat(entries.size(), is(1)); assertThat(entries.get(0).indexName(), is("test20")); } { - List entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 7, 0); + List entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 7, 0); assertThat(entries.size(), is(0)); } { - List entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 50, 100); + List entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 50, 100); assertThat(entries.size(), is(0)); } { - List entries = errorStore.getErrorsInfo(entry -> entry.retryCount() > 2, 100); + List entries = errorStore.getErrorsInfo(projectId, entry -> entry.retryCount() > 2, 100); assertThat(entries.size(), is(2)); assertThat(entries.get(0).indexName(), is("test20")); assertThat(entries.get(1).indexName(), is("test5")); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index cfab412b45afc..2598735f270d5 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -505,7 +505,8 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { // all backing indices are in the error store for (Index index : dataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore().recordError(index.getName(), new NullPointerException("bad")); + dataStreamLifecycleService.getErrorStore() + .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("bad")); } Index writeIndex = dataStream.getWriteIndex(); // all indices but the write index are deleted @@ -527,10 +528,16 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { dataStreamLifecycleService.run(stateWithDeletedIndices); for (Index deletedIndex : deletedIndices) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(deletedIndex.getName()), nullValue()); + assertThat( + dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, deletedIndex.getName()), + nullValue() + ); } // the value for the write index should still be in the error store - assertThat(dataStreamLifecycleService.getErrorStore().getError(dataStream.getWriteIndex().getName()), notNullValue()); + assertThat( + dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, dataStream.getWriteIndex().getName()), + notNullValue() + ); } public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { @@ -547,7 +554,8 @@ public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { ); // all backing indices are in the error store for (Index index : dataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore().recordError(index.getName(), new NullPointerException("bad")); + dataStreamLifecycleService.getErrorStore() + .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("bad")); } builder.put(dataStream); ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); @@ -568,7 +576,7 @@ public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { dataStreamLifecycleService.run(updatedState); for (Index index : dataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(index.getName()), nullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), nullValue()); } } @@ -585,7 +593,8 @@ public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { ); // all backing indices are in the error store for (Index index : ilmManagedDataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore().recordError(index.getName(), new NullPointerException("will be ILM managed soon")); + dataStreamLifecycleService.getErrorStore() + .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("will be ILM managed soon")); } String dataStreamWithBackingIndicesInErrorState = randomAlphaOfLength(15).toLowerCase(Locale.ROOT); DataStream dslManagedDataStream = createDataStream( @@ -598,7 +607,8 @@ public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { ); // put all backing indices in the error store for (Index index : dslManagedDataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore().recordError(index.getName(), new NullPointerException("dsl managed index")); + dataStreamLifecycleService.getErrorStore() + .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("dsl managed index")); } builder.put(ilmManagedDataStream); builder.put(dslManagedDataStream); @@ -620,10 +630,10 @@ public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { dataStreamLifecycleService.run(updatedState); for (Index index : dslManagedDataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(index.getName()), notNullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), notNullValue()); } for (Index index : ilmManagedDataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(index.getName()), nullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), nullValue()); } } @@ -1402,7 +1412,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); assertThat(clientSeenRequests.size(), is(0)); - ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(firstGenIndexName); + ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenIndexName); assertThat(error, notNullValue()); assertThat(error.error(), containsString("resource_already_exists_exception")); } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java index 3be1be12487b9..6240ac7fd15e9 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java @@ -12,12 +12,11 @@ import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition; import org.elasticsearch.action.admin.indices.rollover.RolloverInfo; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.TimeValue; @@ -39,6 +38,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamTestHelper.newInstance; import static org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleFixtures.createDataStream; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -51,7 +51,8 @@ public class TransportGetDataStreamLifecycleStatsActionTests extends ESTestCase mock(ClusterService.class), mock(ThreadPool.class), mock(ActionFilters.class), - dataStreamLifecycleService + dataStreamLifecycleService, + TestProjectResolvers.singleProjectOnly() ); private Long lastRunDuration; private Long timeBetweenStarts; @@ -64,11 +65,11 @@ public void setUp() throws Exception { when(dataStreamLifecycleService.getLastRunDuration()).thenReturn(lastRunDuration); when(dataStreamLifecycleService.getTimeBetweenStarts()).thenReturn(timeBetweenStarts); when(dataStreamLifecycleService.getErrorStore()).thenReturn(errorStore); - when(errorStore.getAllIndices()).thenReturn(Set.of()); + when(errorStore.getAllIndices(any())).thenReturn(Set.of()); } public void testEmptyClusterState() { - GetDataStreamLifecycleStatsAction.Response response = action.collectStats(ClusterState.EMPTY_STATE); + GetDataStreamLifecycleStatsAction.Response response = action.collectStats(ProjectMetadata.builder(randomUniqueProjectId()).build()); assertThat(response.getRunDuration(), is(lastRunDuration)); assertThat(response.getTimeBetweenStarts(), is(timeBetweenStarts)); assertThat(response.getDataStreamStats().isEmpty(), is(true)); @@ -77,7 +78,7 @@ public void testEmptyClusterState() { public void testMixedDataStreams() { Set indicesInError = new HashSet<>(); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream ilmDataStream = createDataStream( builder, "ilm-managed-index", @@ -132,9 +133,9 @@ public void testMixedDataStreams() { backingIndices.add(indexMetadata.getIndex()); builder.put(newInstance(dataStreamName, backingIndices, 3, null, false, DataStreamLifecycle.builder().build())); } - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); - when(errorStore.getAllIndices()).thenReturn(indicesInError); - GetDataStreamLifecycleStatsAction.Response response = action.collectStats(state); + ProjectMetadata project = builder.build(); + when(errorStore.getAllIndices(project.id())).thenReturn(indicesInError); + GetDataStreamLifecycleStatsAction.Response response = action.collectStats(project); assertThat(response.getRunDuration(), is(lastRunDuration)); assertThat(response.getTimeBetweenStarts(), is(timeBetweenStarts)); assertThat(response.getDataStreamStats().size(), is(2)); diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java index f8a2ac3c61029..f37db3a9a60b2 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/health/DataStreamLifecycleHealthInfoPublisherTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; @@ -24,6 +25,7 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.health.node.DataStreamLifecycleHealthInfo; @@ -91,10 +93,12 @@ public void cleanup() { } public void testPublishDslErrorEntries() { + @FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID") + final var projectId = Metadata.DEFAULT_PROJECT_ID; for (int i = 0; i < 11; i++) { - errorStore.recordError("testIndexOverSignalThreshold", new NullPointerException("ouch")); + errorStore.recordError(projectId, "testIndexOverSignalThreshold", new NullPointerException("ouch")); } - errorStore.recordError("testIndex", new IllegalStateException("bad state")); + errorStore.recordError(projectId, "testIndex", new IllegalStateException("bad state")); ClusterState stateWithHealthNode = ClusterStateCreationUtils.state(node1, node1, node1, allNodes); ClusterServiceUtils.setState(clusterService, stateWithHealthNode); dslHealthInfoPublisher.publishDslErrorEntries(new ActionListener<>() { @@ -117,11 +121,13 @@ public void onFailure(Exception e) { } public void testPublishDslErrorEntriesNoHealthNode() { + @FixForMultiProject(description = "Once the health API becomes project-aware, we shouldn't use the default project ID") + final var projectId = Metadata.DEFAULT_PROJECT_ID; // no requests are being executed for (int i = 0; i < 11; i++) { - errorStore.recordError("testIndexOverSignalThreshold", new NullPointerException("ouch")); + errorStore.recordError(projectId, "testIndexOverSignalThreshold", new NullPointerException("ouch")); } - errorStore.recordError("testIndex", new IllegalStateException("bad state")); + errorStore.recordError(projectId, "testIndex", new IllegalStateException("bad state")); ClusterState stateNoHealthNode = ClusterStateCreationUtils.state(node1, node1, null, allNodes); ClusterServiceUtils.setState(clusterService, stateNoHealthNode); diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java index 0ba7951b8b54d..6ce26240dd542 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleDownsamplingSecurityIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.CompressedXContent; @@ -243,9 +244,9 @@ private Map collectErrorsFromStoreAsMap() { Map indicesAndErrors = new HashMap<>(); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { DataStreamLifecycleErrorStore errorStore = lifecycleService.getErrorStore(); - Set allIndices = errorStore.getAllIndices(); + Set allIndices = errorStore.getAllIndices(Metadata.DEFAULT_PROJECT_ID); for (var index : allIndices) { - ErrorEntry error = errorStore.getError(index); + ErrorEntry error = errorStore.getError(Metadata.DEFAULT_PROJECT_ID, index); if (error != null) { indicesAndErrors.put(index, error.error()); } diff --git a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java index b25ebaa463f61..612f88c3ea029 100644 --- a/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java +++ b/x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/integration/DataStreamLifecycleServiceRuntimeSecurityIT.java @@ -20,6 +20,7 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.settings.Settings; @@ -170,9 +171,9 @@ private Map collectErrorsFromStoreAsMap() { Map indicesAndErrors = new HashMap<>(); for (DataStreamLifecycleService lifecycleService : lifecycleServices) { DataStreamLifecycleErrorStore errorStore = lifecycleService.getErrorStore(); - Set allIndices = errorStore.getAllIndices(); + Set allIndices = errorStore.getAllIndices(Metadata.DEFAULT_PROJECT_ID); for (var index : allIndices) { - ErrorEntry error = errorStore.getError(index); + ErrorEntry error = errorStore.getError(Metadata.DEFAULT_PROJECT_ID, index); if (error != null) { indicesAndErrors.put(index, error.error()); }