diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 7d5f4bbee32be..934c52d93cf79 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -211,7 +211,8 @@ public Collection createComponents(PluginServices services) { errorStoreInitialisationService.get(), services.allocationService(), dataStreamLifecycleErrorsPublisher.get(), - services.dataStreamGlobalRetentionSettings() + services.dataStreamGlobalRetentionSettings(), + services.projectResolver() ) ); dataLifecycleInitialisationService.get().init(); diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index bf57bceab1b2f..89835e94046fa 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -41,6 +41,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateTaskListener; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.SimpleBatchedExecutor; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; @@ -52,8 +53,9 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.ResolvedExpression; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SelectorResolver; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterServiceTaskQueue; @@ -64,7 +66,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.core.FixForMultiProject; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Strings; import org.elasticsearch.core.TimeValue; @@ -165,10 +166,11 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private final Client client; private final ClusterService clusterService; private final ThreadPool threadPool; - final ResultDeduplicator transportActionsDeduplicator; - final ResultDeduplicator clusterStateChangesDeduplicator; + final ResultDeduplicator, Void> transportActionsDeduplicator; + final ResultDeduplicator, Void> clusterStateChangesDeduplicator; private final DataStreamLifecycleHealthInfoPublisher dslHealthInfoPublisher; private final DataStreamGlobalRetentionSettings globalRetentionSettings; + private final ProjectResolver projectResolver; private LongSupplier nowSupplier; private final Clock clock; private final DataStreamLifecycleErrorStore errorStore; @@ -217,7 +219,8 @@ public DataStreamLifecycleService( DataStreamLifecycleErrorStore errorStore, AllocationService allocationService, DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher, - DataStreamGlobalRetentionSettings globalRetentionSettings + DataStreamGlobalRetentionSettings globalRetentionSettings, + ProjectResolver projectResolver ) { this.settings = settings; this.client = client; @@ -229,6 +232,7 @@ public DataStreamLifecycleService( this.nowSupplier = nowSupplier; this.errorStore = errorStore; this.globalRetentionSettings = globalRetentionSettings; + this.projectResolver = projectResolver; this.scheduledJob = null; this.pollInterval = DATA_STREAM_LIFECYCLE_POLL_INTERVAL_SETTING.get(settings); this.targetMergePolicyFloorSegment = DATA_STREAM_MERGE_POLICY_TARGET_FLOOR_SEGMENT_SETTING.get(settings); @@ -345,10 +349,22 @@ void run(ClusterState state) { timeBetweenStarts = startTime - lastRunStartedAt; } lastRunStartedAt = startTime; + for (var projectId : state.metadata().projects().keySet()) { + // We catch inside the loop to avoid one broken project preventing DLM to run on other projects. + try { + run(state.projectState(projectId)); + } catch (Exception e) { + logger.error(Strings.format("Data stream lifecycle failed to run on project [%s]", projectId), e); + } + } + } + + private void run(ProjectState projectState) { + final var project = projectState.metadata(); int affectedIndices = 0; int affectedDataStreams = 0; - for (DataStream dataStream : state.metadata().getProject().dataStreams().values()) { - clearErrorStoreForUnmanagedIndices(dataStream); + for (DataStream dataStream : project.dataStreams().values()) { + clearErrorStoreForUnmanagedIndices(project, dataStream); if (dataStream.getDataLifecycle() == null) { continue; } @@ -358,9 +374,9 @@ void run(ClusterState state) { // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, // depending on rollover criteria, for this reason we exclude them for the remaining run. - indicesToExcludeForRemainingRun.add(maybeExecuteRollover(state, dataStream, false)); + indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, false)); if (DataStream.isFailureStoreFeatureFlagEnabled()) { - Index failureStoreWriteIndex = maybeExecuteRollover(state, dataStream, true); + Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, true); if (failureStoreWriteIndex != null) { indicesToExcludeForRemainingRun.add(failureStoreWriteIndex); } @@ -370,14 +386,14 @@ void run(ClusterState state) { // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes indicesToExcludeForRemainingRun.addAll( timeSeriesIndicesStillWithinTimeBounds( - state.metadata(), - getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata().getProject()::index, false), + project, + getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false), nowSupplier ) ); try { - indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(state, dataStream, indicesToExcludeForRemainingRun)); + indicesToExcludeForRemainingRun.addAll(maybeExecuteRetention(project, dataStream, indicesToExcludeForRemainingRun)); } catch (Exception e) { // individual index errors would be reported via the API action listener for every delete call // we could potentially record errors at a data stream level and expose it via the _data_stream API? @@ -393,10 +409,7 @@ void run(ClusterState state) { try { indicesToExcludeForRemainingRun.addAll( - maybeExecuteForceMerge( - state, - getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata().getProject()::index, true) - ) + maybeExecuteForceMerge(project, getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true)) ); } catch (Exception e) { logger.error( @@ -412,9 +425,9 @@ void run(ClusterState state) { try { indicesToExcludeForRemainingRun.addAll( maybeExecuteDownsampling( - state, + projectState, dataStream, - getTargetIndices(dataStream, indicesToExcludeForRemainingRun, state.metadata().getProject()::index, false) + getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false) ) ); } catch (Exception e) { @@ -433,18 +446,19 @@ void run(ClusterState state) { } lastRunDuration = nowSupplier.getAsLong() - lastRunStartedAt; logger.trace( - "Data stream lifecycle service run for {} and performed operations on [{}] indices, part of [{}] data streams", + "Data stream lifecycle service ran for {} and performed operations on [{}] indices, part of [{}] data streams, in project [{}]", TimeValue.timeValueMillis(lastRunDuration).toHumanReadableString(2), affectedIndices, - affectedDataStreams + affectedDataStreams, + project.id() ); } // visible for testing - static Set timeSeriesIndicesStillWithinTimeBounds(Metadata metadata, List targetIndices, LongSupplier nowSupplier) { + static Set timeSeriesIndicesStillWithinTimeBounds(ProjectMetadata project, List targetIndices, LongSupplier nowSupplier) { Set tsIndicesWithinBounds = new HashSet<>(); for (Index index : targetIndices) { - IndexMetadata backingIndex = metadata.getProject().index(index); + IndexMetadata backingIndex = project.index(index); assert backingIndex != null : "the data stream backing indices must exist"; if (IndexSettings.MODE.get(backingIndex.getSettings()) == IndexMode.TIME_SERIES) { Instant configuredEndTime = IndexSettings.TIME_SERIES_END_TIME.get(backingIndex.getSettings()); @@ -479,15 +493,15 @@ static Set timeSeriesIndicesStillWithinTimeBounds(Metadata metadata, List * replacing an index in the data stream, deleting a source index, or downsampling itself) so these indices can be skipped in case * there are other operations to be executed by the data stream lifecycle after downsampling. */ - Set maybeExecuteDownsampling(ClusterState state, DataStream dataStream, List targetIndices) { + Set maybeExecuteDownsampling(ProjectState projectState, DataStream dataStream, List targetIndices) { Set affectedIndices = new HashSet<>(); - Metadata metadata = state.metadata(); + final var project = projectState.metadata(); for (Index index : targetIndices) { - IndexMetadata backingIndexMeta = metadata.getProject().index(index); + IndexMetadata backingIndexMeta = project.index(index); assert backingIndexMeta != null : "the data stream backing indices must exist"; List downsamplingRounds = dataStream.getDownsamplingRoundsFor( index, - metadata.getProject()::index, + project::index, nowSupplier ); if (downsamplingRounds.isEmpty()) { @@ -499,15 +513,15 @@ Set maybeExecuteDownsampling(ClusterState state, DataStream dataStream, L // if the current index is not a downsample we want to mark the index as read-only before proceeding with downsampling if (org.elasticsearch.common.Strings.hasText(downsamplingSourceIndex) == false - && state.blocks().indexBlocked(ClusterBlockLevel.WRITE, indexName) == false) { + && projectState.blocks().indexBlocked(project.id(), ClusterBlockLevel.WRITE, indexName) == false) { affectedIndices.add(index); - addIndexBlockOnce(indexName); + addIndexBlockOnce(project.id(), indexName); } else { // we're not performing any operation for this index which means that it: // - has matching downsample rounds // - is read-only // So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round - affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, metadata)); + affectedIndices.addAll(waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, project)); } } @@ -524,7 +538,7 @@ private Set waitForInProgressOrTriggerDownsampling( DataStream dataStream, IndexMetadata backingIndex, List downsamplingRounds, - Metadata metadata + ProjectMetadata project ) { assert dataStream.getIndices().contains(backingIndex.getIndex()) : "the provided backing index must be part of data stream:" + dataStream.getName(); @@ -541,11 +555,12 @@ private Set waitForInProgressOrTriggerDownsampling( backingIndex, round.config().getFixedInterval() ); - IndexMetadata targetDownsampleIndexMeta = metadata.getProject().index(downsampleIndexName); + IndexMetadata targetDownsampleIndexMeta = project.index(downsampleIndexName); boolean targetDownsampleIndexExists = targetDownsampleIndexMeta != null; if (targetDownsampleIndexExists) { Set downsamplingNotComplete = evaluateDownsampleStatus( + project.id(), dataStream, INDEX_DOWNSAMPLE_STATUS.get(targetDownsampleIndexMeta.getSettings()), round, @@ -562,7 +577,7 @@ private Set waitForInProgressOrTriggerDownsampling( // no maintenance needed for previously started downsampling actions and we are on the last matching round so it's time // to kick off downsampling affectedIndices.add(index); - downsampleIndexOnce(round, indexName, downsampleIndexName); + downsampleIndexOnce(round, project.id(), indexName, downsampleIndexName); } } } @@ -572,7 +587,12 @@ private Set waitForInProgressOrTriggerDownsampling( /** * Issues a request downsample the source index to the downsample index for the specified round. */ - private void downsampleIndexOnce(DataStreamLifecycle.DownsamplingRound round, String sourceIndex, String downsampleIndexName) { + private void downsampleIndexOnce( + DataStreamLifecycle.DownsamplingRound round, + ProjectId projectId, + String sourceIndex, + String downsampleIndexName + ) { DownsampleAction.Request request = new DownsampleAction.Request( TimeValue.THIRTY_SECONDS /* TODO should this be longer/configurable? */, sourceIndex, @@ -581,9 +601,10 @@ private void downsampleIndexOnce(DataStreamLifecycle.DownsamplingRound round, St round.config() ); transportActionsDeduplicator.executeOnce( - request, + Tuple.tuple(projectId, request), new ErrorRecordingActionListener( DownsampleAction.NAME, + projectId, sourceIndex, errorStore, Strings.format( @@ -593,7 +614,7 @@ private void downsampleIndexOnce(DataStreamLifecycle.DownsamplingRound round, St ), signallingErrorRetryInterval ), - (req, reqListener) -> downsampleIndex(request, reqListener) + (req, reqListener) -> downsampleIndex(projectId, request, reqListener) ); } @@ -603,6 +624,7 @@ private void downsampleIndexOnce(DataStreamLifecycle.DownsamplingRound round, St * STARTED), or replace the backing index with the downsample index in the data stream (if the status is SUCCESS). */ private Set evaluateDownsampleStatus( + ProjectId projectId, DataStream dataStream, IndexMetadata.DownsampleTaskStatus downsampleStatus, DataStreamLifecycle.DownsamplingRound currentRound, @@ -620,6 +642,7 @@ private Set evaluateDownsampleStatus( // we fail now but perhaps we should just randomise the name? recordAndLogError( + projectId, indexName, errorStore, new ResourceAlreadyExistsException(downsampleIndexName), @@ -657,7 +680,7 @@ private Set evaluateDownsampleStatus( // NOTE that the downsample request is made through the deduplicator so it will only really be executed if // there isn't one already in-flight. This can happen if a previous request timed-out, failed, or there was a // master failover and data stream lifecycle needed to restart - downsampleIndexOnce(currentRound, indexName, downsampleIndexName); + downsampleIndexOnce(currentRound, projectId, indexName, downsampleIndexName); affectedIndices.add(backingIndex); yield affectedIndices; } @@ -666,7 +689,7 @@ private Set evaluateDownsampleStatus( // at this point the source index is part of the data stream and the downsample index is complete but not // part of the data stream. we need to replace the source index with the downsample index in the data stream affectedIndices.add(backingIndex); - replaceBackingIndexWithDownsampleIndexOnce(dataStream, indexName, downsampleIndexName); + replaceBackingIndexWithDownsampleIndexOnce(projectId, dataStream, indexName, downsampleIndexName); } yield affectedIndices; } @@ -676,15 +699,21 @@ private Set evaluateDownsampleStatus( /** * Issues a request to replace the backing index with the downsample index through the cluster state changes deduplicator. */ - private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, String backingIndexName, String downsampleIndexName) { + private void replaceBackingIndexWithDownsampleIndexOnce( + ProjectId projectId, + DataStream dataStream, + String backingIndexName, + String downsampleIndexName + ) { String requestName = "dsl-replace-" + dataStream.getName() + "-" + backingIndexName + "-" + downsampleIndexName; clusterStateChangesDeduplicator.executeOnce( // we use a String key here as otherwise it's ... awkward as we have to create the DeleteSourceAndAddDownsampleToDS as the // key _without_ a listener (passing in null) and then below we create it again with the `reqListener`. We're using a String // as it seems to be clearer. - requestName, + Tuple.tuple(projectId, requestName), new ErrorRecordingActionListener( requestName, + projectId, backingIndexName, errorStore, Strings.format( @@ -702,8 +731,6 @@ private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, S downsampleIndexName, dataStream ); - @FixForMultiProject(description = "The correct project ID should be passed here") - final var projectId = ProjectId.DEFAULT; swapSourceWithDownsampleIndexQueue.submitTask( "data-stream-lifecycle-delete-source[" + backingIndexName + "]-add-to-datastream-[" + downsampleIndexName + "]", new DeleteSourceAndAddDownsampleToDS( @@ -723,36 +750,38 @@ private void replaceBackingIndexWithDownsampleIndexOnce(DataStream dataStream, S /** * Issues a request to delete the provided index through the transport action deduplicator. */ - private void deleteIndexOnce(String indexName, String reason) { + private void deleteIndexOnce(ProjectId projectId, String indexName, String reason) { DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName).masterNodeTimeout(TimeValue.MAX_VALUE); transportActionsDeduplicator.executeOnce( - deleteIndexRequest, + Tuple.tuple(projectId, deleteIndexRequest), new ErrorRecordingActionListener( TransportDeleteIndexAction.TYPE.name(), + projectId, indexName, errorStore, Strings.format("Data stream lifecycle encountered an error trying to delete index [%s]", indexName), signallingErrorRetryInterval ), - (req, reqListener) -> deleteIndex(deleteIndexRequest, reason, reqListener) + (req, reqListener) -> deleteIndex(projectId, deleteIndexRequest, reason, reqListener) ); } /** * Issues a request to add a WRITE index block for the provided index through the transport action deduplicator. */ - private void addIndexBlockOnce(String indexName) { + private void addIndexBlockOnce(ProjectId projectId, String indexName) { AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(WRITE, indexName).masterNodeTimeout(TimeValue.MAX_VALUE); transportActionsDeduplicator.executeOnce( - addIndexBlockRequest, + Tuple.tuple(projectId, addIndexBlockRequest), new ErrorRecordingActionListener( TransportAddIndexBlockAction.TYPE.name(), + projectId, indexName, errorStore, Strings.format("Data stream lifecycle service encountered an error trying to mark index [%s] as readonly", indexName), signallingErrorRetryInterval ), - (req, reqListener) -> addIndexBlock(addIndexBlockRequest, reqListener) + (req, reqListener) -> addIndexBlock(projectId, addIndexBlockRequest, reqListener) ); } @@ -788,37 +817,35 @@ static List getTargetIndices( * This clears the error store for the case where a data stream or some backing indices were managed by data stream lifecycle, failed in * their lifecycle execution, and then they were not managed by the data stream lifecycle (maybe they were switched to ILM). */ - private void clearErrorStoreForUnmanagedIndices(DataStream dataStream) { - Metadata metadata = clusterService.state().metadata(); - final var projectId = metadata.getProject().id(); - for (String indexName : errorStore.getAllIndices(projectId)) { - IndexAbstraction indexAbstraction = metadata.getProject().getIndicesLookup().get(indexName); + private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStream dataStream) { + for (String indexName : errorStore.getAllIndices(project.id())) { + IndexAbstraction indexAbstraction = project.getIndicesLookup().get(indexName); DataStream parentDataStream = indexAbstraction != null ? indexAbstraction.getParentDataStream() : null; if (indexAbstraction == null || parentDataStream == null) { logger.trace( "Clearing recorded error for index [{}] because the index doesn't exist or is not a data stream backing index anymore", indexName ); - errorStore.clearRecordedError(projectId, indexName); + errorStore.clearRecordedError(project.id(), indexName); } else if (parentDataStream.getName().equals(dataStream.getName())) { // we're only verifying the indices that pertain to this data stream - IndexMetadata indexMeta = metadata.getProject().index(indexName); - if (dataStream.isIndexManagedByDataStreamLifecycle(indexMeta.getIndex(), metadata.getProject()::index) == false) { + IndexMetadata indexMeta = project.index(indexName); + if (dataStream.isIndexManagedByDataStreamLifecycle(indexMeta.getIndex(), project::index) == false) { logger.trace("Clearing recorded error for index [{}] because the index is not managed by DSL anymore", indexName); - errorStore.clearRecordedError(projectId, indexName); + errorStore.clearRecordedError(project.id(), indexName); } } } } @Nullable - private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, boolean rolloverFailureStore) { + private Index maybeExecuteRollover(ProjectMetadata project, DataStream dataStream, boolean rolloverFailureStore) { Index currentRunWriteIndex = rolloverFailureStore ? dataStream.getWriteFailureIndex() : dataStream.getWriteIndex(); if (currentRunWriteIndex == null) { return null; } try { - if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, state.metadata().getProject()::index)) { + if (dataStream.isIndexManagedByDataStreamLifecycle(currentRunWriteIndex, project::index)) { DataStreamLifecycle lifecycle = rolloverFailureStore ? dataStream.getFailuresLifecycle() : dataStream.getDataLifecycle(); RolloverRequest rolloverRequest = getDefaultRolloverRequest( rolloverConfiguration, @@ -827,9 +854,10 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo rolloverFailureStore ); transportActionsDeduplicator.executeOnce( - rolloverRequest, + Tuple.tuple(project.id(), rolloverRequest), new ErrorRecordingActionListener( RolloverAction.NAME, + project.id(), currentRunWriteIndex.getName(), errorStore, Strings.format( @@ -839,7 +867,7 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo ), signallingErrorRetryInterval ), - (req, reqListener) -> rolloverDataStream(currentRunWriteIndex.getName(), rolloverRequest, reqListener) + (req, reqListener) -> rolloverDataStream(project.id(), currentRunWriteIndex.getName(), rolloverRequest, reqListener) ); } } catch (Exception e) { @@ -852,12 +880,13 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo ), e ); - DataStream latestDataStream = clusterService.state().metadata().getProject().dataStreams().get(dataStream.getName()); + ProjectMetadata latestProject = clusterService.state().metadata().projects().get(project.id()); + DataStream latestDataStream = latestProject == null ? null : latestProject.dataStreams().get(dataStream.getName()); if (latestDataStream != null) { if (latestDataStream.getWriteIndex().getName().equals(currentRunWriteIndex.getName())) { // data stream has not been rolled over in the meantime so record the error against the write index we // attempted the rollover - errorStore.recordError(clusterService.state().metadata().getProject().id(), currentRunWriteIndex.getName(), e); + errorStore.recordError(project.id(), currentRunWriteIndex.getName(), e); } } } @@ -868,21 +897,20 @@ private Index maybeExecuteRollover(ClusterState state, DataStream dataStream, bo * This method sends requests to delete any indices in the datastream that exceed its retention policy. It returns the set of indices * it has sent delete requests for. * - * @param state The cluster state from which to get index metadata + * @param project The project metadata from which to get index metadata * @param dataStream The data stream * @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted * @return The set of indices that delete requests have been sent for */ - Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set indicesToExcludeForRemainingRun) { - Metadata metadata = state.metadata(); + Set maybeExecuteRetention(ProjectMetadata project, DataStream dataStream, Set indicesToExcludeForRemainingRun) { DataStreamGlobalRetention globalRetention = dataStream.isSystem() ? null : globalRetentionSettings.get(); List backingIndicesOlderThanRetention = dataStream.getBackingIndicesPastRetention( - metadata.getProject()::index, + project::index, nowSupplier, globalRetention ); List failureIndicesOlderThanRetention = dataStream.getFailureIndicesPastRetention( - metadata.getProject()::index, + project::index, nowSupplier, globalRetention ); @@ -895,7 +923,7 @@ Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set< TimeValue dataRetention = dataStream.getDataLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); for (Index index : backingIndicesOlderThanRetention) { if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata backingIndex = metadata.getProject().index(index); + IndexMetadata backingIndex = project.index(index); assert backingIndex != null : "the data stream backing indices must exist"; IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); @@ -919,7 +947,7 @@ Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set< // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) // let's start simple and reevaluate String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + dataRetention + "] retention period"); + deleteIndexOnce(project.id(), indexName, "the lapsed [" + dataRetention + "] retention period"); } } } @@ -929,13 +957,13 @@ Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set< var failureRetention = dataStream.getFailuresLifecycle().getEffectiveDataRetention(globalRetention, dataStream.isInternal()); for (Index index : failureIndicesOlderThanRetention) { if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata failureIndex = metadata.getProject().index(index); + IndexMetadata failureIndex = project.index(index); assert failureIndex != null : "the data stream failure indices must exist"; indicesToBeRemoved.add(index); // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) // let's start simple and reevaluate String indexName = failureIndex.getIndex().getName(); - deleteIndexOnce(indexName, "the lapsed [" + failureRetention + "] retention period"); + deleteIndexOnce(project.id(), indexName, "the lapsed [" + failureRetention + "] retention period"); } } } @@ -946,11 +974,10 @@ Set maybeExecuteRetention(ClusterState state, DataStream dataStream, Set< * This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the * force merge. */ - private Set maybeExecuteForceMerge(ClusterState state, List indices) { - Metadata metadata = state.metadata(); + private Set maybeExecuteForceMerge(ProjectMetadata project, List indices) { Set affectedIndices = new HashSet<>(); for (Index index : indices) { - IndexMetadata backingIndex = metadata.getProject().index(index); + IndexMetadata backingIndex = project.index(index); assert backingIndex != null : "the data stream backing indices must exist"; String indexName = index.getName(); boolean alreadyForceMerged = isForceMergeComplete(backingIndex); @@ -975,9 +1002,10 @@ private Set maybeExecuteForceMerge(ClusterState state, List indice updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE); affectedIndices.add(index); transportActionsDeduplicator.executeOnce( - updateMergePolicySettingsRequest, + Tuple.tuple(project.id(), updateMergePolicySettingsRequest), new ErrorRecordingActionListener( TransportUpdateSettingsAction.TYPE.name(), + project.id(), indexName, errorStore, Strings.format( @@ -987,16 +1015,17 @@ private Set maybeExecuteForceMerge(ClusterState state, List indice ), signallingErrorRetryInterval ), - (req, reqListener) -> updateIndexSetting(updateMergePolicySettingsRequest, reqListener) + (req, reqListener) -> updateIndexSetting(project.id(), updateMergePolicySettingsRequest, reqListener) ); } else { affectedIndices.add(index); ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); // time to force merge the index transportActionsDeduplicator.executeOnce( - new ForceMergeRequestWrapper(forceMergeRequest), + Tuple.tuple(project.id(), new ForceMergeRequestWrapper(forceMergeRequest)), new ErrorRecordingActionListener( ForceMergeAction.NAME, + project.id(), indexName, errorStore, Strings.format( @@ -1006,21 +1035,26 @@ private Set maybeExecuteForceMerge(ClusterState state, List indice ), signallingErrorRetryInterval ), - (req, reqListener) -> forceMergeIndex(forceMergeRequest, reqListener) + (req, reqListener) -> forceMergeIndex(project.id(), forceMergeRequest, reqListener) ); } } return affectedIndices; } - private void rolloverDataStream(String writeIndexName, RolloverRequest rolloverRequest, ActionListener listener) { + private void rolloverDataStream( + ProjectId projectId, + String writeIndexName, + RolloverRequest rolloverRequest, + ActionListener listener + ) { // "saving" the rollover target name here so we don't capture the entire request ResolvedExpression resolvedRolloverTarget = SelectorResolver.parseExpression( rolloverRequest.getRolloverTarget(), rolloverRequest.indicesOptions() ); logger.trace("Data stream lifecycle issues rollover request for data stream [{}]", rolloverRequest.getRolloverTarget()); - client.admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() { + projectResolver.projectClient(client, projectId).admin().indices().rolloverIndex(rolloverRequest, new ActionListener<>() { @Override public void onResponse(RolloverResponse rolloverResponse) { // Log only when the conditions were met and the index was rolled over. @@ -1044,7 +1078,8 @@ public void onResponse(RolloverResponse rolloverResponse) { @Override public void onFailure(Exception e) { - DataStream dataStream = clusterService.state().metadata().getProject().dataStreams().get(resolvedRolloverTarget.resource()); + ProjectMetadata latestProject = clusterService.state().metadata().projects().get(projectId); + DataStream dataStream = latestProject == null ? null : latestProject.dataStreams().get(resolvedRolloverTarget.resource()); if (dataStream == null || dataStream.getWriteIndex().getName().equals(writeIndexName) == false) { // the data stream has another write index so no point in recording an error for the previous write index we were // attempting to roll over @@ -1060,7 +1095,7 @@ public void onFailure(Exception e) { }); } - private void updateIndexSetting(UpdateSettingsRequest updateSettingsRequest, ActionListener listener) { + private void updateIndexSetting(ProjectId projectId, UpdateSettingsRequest updateSettingsRequest, ActionListener listener) { assert updateSettingsRequest.indices() != null && updateSettingsRequest.indices().length == 1 : "Data stream lifecycle service updates the settings for one index at a time"; // "saving" the index name here so we don't capture the entire request @@ -1070,7 +1105,7 @@ private void updateIndexSetting(UpdateSettingsRequest updateSettingsRequest, Act updateSettingsRequest.settings().keySet(), targetIndex ); - client.admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() { + projectResolver.projectClient(client, projectId).admin().indices().updateSettings(updateSettingsRequest, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { logger.info( @@ -1086,7 +1121,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { // index was already deleted, treat this as a success logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex); - errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); + errorStore.clearRecordedError(projectId, targetIndex); listener.onResponse(null); return; } @@ -1096,7 +1131,7 @@ public void onFailure(Exception e) { }); } - private void addIndexBlock(AddIndexBlockRequest addIndexBlockRequest, ActionListener listener) { + private void addIndexBlock(ProjectId projectId, AddIndexBlockRequest addIndexBlockRequest, ActionListener listener) { assert addIndexBlockRequest.indices() != null && addIndexBlockRequest.indices().length == 1 : "Data stream lifecycle service updates the index block for one index at a time"; // "saving" the index name here so we don't capture the entire request @@ -1106,7 +1141,7 @@ private void addIndexBlock(AddIndexBlockRequest addIndexBlockRequest, ActionList addIndexBlockRequest.getBlock(), targetIndex ); - client.admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() { + projectResolver.projectClient(client, projectId).admin().indices().addBlock(addIndexBlockRequest, new ActionListener<>() { @Override public void onResponse(AddIndexBlockResponse addIndexBlockResponse) { if (addIndexBlockResponse.isAcknowledged()) { @@ -1169,7 +1204,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { // index was already deleted, treat this as a success logger.trace("Clearing recorded error for index [{}] because the index was deleted", targetIndex); - errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); + errorStore.clearRecordedError(projectId, targetIndex); listener.onResponse(null); return; } @@ -1179,13 +1214,13 @@ public void onFailure(Exception e) { }); } - private void deleteIndex(DeleteIndexRequest deleteIndexRequest, String reason, ActionListener listener) { + private void deleteIndex(ProjectId projectId, DeleteIndexRequest deleteIndexRequest, String reason, ActionListener listener) { assert deleteIndexRequest.indices() != null && deleteIndexRequest.indices().length == 1 : "Data stream lifecycle deletes one index at a time"; // "saving" the index name here so we don't capture the entire request String targetIndex = deleteIndexRequest.indices()[0]; logger.trace("Data stream lifecycle issues request to delete index [{}]", targetIndex); - client.admin().indices().delete(deleteIndexRequest, new ActionListener<>() { + projectResolver.projectClient(client, projectId).admin().indices().delete(deleteIndexRequest, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (acknowledgedResponse.isAcknowledged()) { @@ -1205,7 +1240,7 @@ public void onFailure(Exception e) { if (e instanceof IndexNotFoundException) { logger.trace("Data stream lifecycle did not delete index [{}] as it was already deleted", targetIndex); // index was already deleted, treat this as a success - errorStore.clearRecordedError(clusterService.state().metadata().getProject().id(), targetIndex); + errorStore.clearRecordedError(projectId, targetIndex); listener.onResponse(null); return; } @@ -1222,11 +1257,11 @@ public void onFailure(Exception e) { }); } - private void downsampleIndex(DownsampleAction.Request request, ActionListener listener) { + private void downsampleIndex(ProjectId projectId, DownsampleAction.Request request, ActionListener listener) { String sourceIndex = request.getSourceIndex(); String downsampleIndex = request.getTargetIndex(); logger.info("Data stream lifecycle issuing request to downsample index [{}] to index [{}]", sourceIndex, downsampleIndex); - client.execute(DownsampleAction.INSTANCE, request, new ActionListener<>() { + projectResolver.projectClient(client, projectId).execute(DownsampleAction.INSTANCE, request, new ActionListener<>() { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { assert acknowledgedResponse.isAcknowledged() : "the downsample response is always acknowledged"; @@ -1246,12 +1281,12 @@ public void onFailure(Exception e) { * metadata in the cluster state indicating when the force merge has completed. The listener is notified after the cluster state * update has been made, or when the forcemerge fails or the write of the to the cluster state fails. */ - private void forceMergeIndex(ForceMergeRequest forceMergeRequest, ActionListener listener) { + private void forceMergeIndex(ProjectId projectId, ForceMergeRequest forceMergeRequest, ActionListener listener) { assert forceMergeRequest.indices() != null && forceMergeRequest.indices().length == 1 : "Data stream lifecycle force merges one index at a time"; final String targetIndex = forceMergeRequest.indices()[0]; logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex); - client.admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() { + projectResolver.projectClient(client, projectId).admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() { @Override public void onResponse(BroadcastResponse forceMergeResponse) { if (forceMergeResponse.getFailedShards() > 0) { @@ -1274,7 +1309,7 @@ public void onResponse(BroadcastResponse forceMergeResponse) { onFailure(new ElasticsearchException(message)); } else { logger.info("Data stream lifecycle successfully force merged index [{}]", targetIndex); - setForceMergeCompletedTimestamp(targetIndex, listener); + setForceMergeCompletedTimestamp(projectId, targetIndex, listener); } } @@ -1290,10 +1325,10 @@ public void onFailure(Exception e) { * "data_stream_lifecycle" to value. The method returns immediately, but the update happens asynchronously and listener is notified on * success or failure. */ - private void setForceMergeCompletedTimestamp(String targetIndex, ActionListener listener) { + private void setForceMergeCompletedTimestamp(ProjectId projectId, String targetIndex, ActionListener listener) { forceMergeClusterStateUpdateTaskQueue.submitTask( Strings.format("Adding force merge complete marker to cluster state for [%s]", targetIndex), - new UpdateForceMergeCompleteTask(listener, targetIndex, threadPool), + new UpdateForceMergeCompleteTask(listener, projectId, targetIndex, threadPool), null ); } @@ -1331,6 +1366,7 @@ public Long getTimeBetweenStarts() { static class ErrorRecordingActionListener implements ActionListener { private final String actionName; + private final ProjectId projectId; private final String targetIndex; private final DataStreamLifecycleErrorStore errorStore; private final String errorLogMessage; @@ -1338,12 +1374,14 @@ static class ErrorRecordingActionListener implements ActionListener { ErrorRecordingActionListener( String actionName, + ProjectId projectId, String targetIndex, DataStreamLifecycleErrorStore errorStore, String errorLogMessage, int signallingErrorRetryThreshold ) { this.actionName = actionName; + this.projectId = projectId; this.targetIndex = targetIndex; this.errorStore = errorStore; this.errorLogMessage = errorLogMessage; @@ -1353,14 +1391,12 @@ static class ErrorRecordingActionListener implements ActionListener { @Override public void onResponse(Void unused) { logger.trace("Clearing recorded error for index [{}] because the [{}] action was successful", targetIndex, actionName); - @FixForMultiProject(description = "Don't use default project ID") - final var projectId = Metadata.DEFAULT_PROJECT_ID; errorStore.clearRecordedError(projectId, targetIndex); } @Override public void onFailure(Exception e) { - recordAndLogError(targetIndex, errorStore, e, errorLogMessage, signallingErrorRetryThreshold); + recordAndLogError(projectId, targetIndex, errorStore, e, errorLogMessage, signallingErrorRetryThreshold); } } @@ -1372,14 +1408,13 @@ public void onFailure(Exception e) { * This allows us to not spam the logs, but signal to the logs if DSL is not making progress. */ static void recordAndLogError( + ProjectId projectId, String targetIndex, DataStreamLifecycleErrorStore errorStore, Exception e, String logMessage, int signallingErrorRetryThreshold ) { - @FixForMultiProject(description = "Don't use default project ID") - final var projectId = Metadata.DEFAULT_PROJECT_ID; ErrorEntry previousError = errorStore.recordError(projectId, targetIndex, e); ErrorEntry currentError = errorStore.getError(projectId, targetIndex); if (previousError == null || (currentError != null && previousError.error().equals(currentError.error()) == false)) { @@ -1510,18 +1545,21 @@ public void setNowSupplier(LongSupplier nowSupplier) { */ static class UpdateForceMergeCompleteTask implements ClusterStateTaskListener { private final ActionListener listener; + private final ProjectId projectId; private final String targetIndex; private final ThreadPool threadPool; - UpdateForceMergeCompleteTask(ActionListener listener, String targetIndex, ThreadPool threadPool) { + UpdateForceMergeCompleteTask(ActionListener listener, ProjectId projectId, String targetIndex, ThreadPool threadPool) { this.listener = listener; + this.projectId = projectId; this.targetIndex = targetIndex; this.threadPool = threadPool; } ClusterState execute(ClusterState currentState) throws Exception { logger.debug("Updating cluster state with force merge complete marker for {}", targetIndex); - IndexMetadata indexMetadata = currentState.metadata().getProject().index(targetIndex); + final var currentProject = currentState.metadata().getProject(projectId); + IndexMetadata indexMetadata = currentProject.index(targetIndex); Map customMetadata = indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); Map newCustomMetadata = new HashMap<>(); if (customMetadata != null) { @@ -1532,8 +1570,8 @@ ClusterState execute(ClusterState currentState) throws Exception { LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, newCustomMetadata ).build(); - Metadata metadata = Metadata.builder(currentState.metadata()).put(updatededIndexMetadata, true).build(); - return ClusterState.builder(currentState).metadata(metadata).build(); + final var updatedProject = ProjectMetadata.builder(currentProject).put(updatededIndexMetadata, true); + return ClusterState.builder(currentState).putProjectMetadata(updatedProject).build(); } @Override diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java index 0c64dba41c894..78bf365efa846 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleFixtures.java @@ -17,7 +17,6 @@ import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.DataStreamLifecycle; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.metadata.ResettableValue; import org.elasticsearch.cluster.metadata.Template; @@ -49,22 +48,6 @@ */ public class DataStreamLifecycleFixtures { - public static DataStream createDataStream( - Metadata.Builder builder, - String dataStreamName, - int backingIndicesCount, - Settings.Builder backingIndicesSettings, - @Nullable DataStreamLifecycle lifecycle, - Long now - ) { - var projectBuilder = builder.getProject(Metadata.DEFAULT_PROJECT_ID); - if (projectBuilder == null) { - projectBuilder = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID); - builder.put(projectBuilder); - } - return createDataStream(projectBuilder, dataStreamName, backingIndicesCount, 0, backingIndicesSettings, lifecycle, now); - } - public static DataStream createDataStream( ProjectMetadata.Builder builder, String dataStreamName, @@ -76,31 +59,6 @@ public static DataStream createDataStream( return createDataStream(builder, dataStreamName, backingIndicesCount, 0, backingIndicesSettings, lifecycle, now); } - public static DataStream createDataStream( - Metadata.Builder builder, - String dataStreamName, - int backingIndicesCount, - int failureIndicesCount, - Settings.Builder backingIndicesSettings, - @Nullable DataStreamLifecycle lifecycle, - Long now - ) { - var projectBuilder = builder.getProject(Metadata.DEFAULT_PROJECT_ID); - if (projectBuilder == null) { - projectBuilder = ProjectMetadata.builder(Metadata.DEFAULT_PROJECT_ID); - builder.put(projectBuilder); - } - return createDataStream( - projectBuilder, - dataStreamName, - backingIndicesCount, - failureIndicesCount, - backingIndicesSettings, - lifecycle, - now - ); - } - public static DataStream createDataStream( ProjectMetadata.Builder builder, String dataStreamName, diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 05fc75c145286..e4abd9980019e 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -46,13 +46,14 @@ import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataIndexStateService; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.cluster.node.DiscoveryNodeUtils; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -90,7 +91,6 @@ import java.time.ZoneId; import java.time.temporal.ChronoUnit; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -181,7 +181,8 @@ public void setupServices() { errorStore, allocationService, new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, client, clusterService, errorStore), - globalRetentionSettings + globalRetentionSettings, + TestProjectResolvers.mustExecuteFirst() ); clientDelegate = null; dataStreamLifecycleService.init(); @@ -198,7 +199,7 @@ public void cleanup() { public void testOperationsExecutedOnce() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -210,7 +211,7 @@ public void testOperationsExecutedOnce() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(5)); @@ -240,7 +241,7 @@ public void testOperationsExecutedOnce() { public void testRetentionNotConfigured() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -251,7 +252,7 @@ public void testRetentionNotConfigured() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(3)); // rollover the write index, and force merge the other two assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class)); @@ -261,7 +262,7 @@ public void testRetentionNotExecutedDueToAge() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; int numFailureIndices = 2; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -273,7 +274,7 @@ public void testRetentionNotExecutedDueToAge() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(5)); // roll over the 2 write indices, and force merge the other three assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class)); @@ -290,12 +291,14 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() { Instant start3 = currentTime.plus(2, ChronoUnit.HOURS); Instant end3 = currentTime.plus(4, ChronoUnit.HOURS); + final var projectId = randomProjectIdOrDefault(); String dataStreamName = "logs_my-app_prod"; var clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + projectId, dataStreamName, List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3)) ); - Metadata.Builder builder = Metadata.builder(clusterState.metadata()); + ProjectMetadata.Builder builder = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)); DataStream dataStream = builder.dataStream(dataStreamName); builder.put( dataStream.copy() @@ -304,7 +307,7 @@ public void testRetentionNotExecutedForTSIndicesWithinTimeBounds() { .setLifecycle(DataStreamLifecycle.builder().dataRetention(TimeValue.ZERO).build()) .build() ); - clusterState = ClusterState.builder(clusterState).metadata(builder).build(); + clusterState = ClusterState.builder(clusterState).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(clusterState); assertThat(clientSeenRequests.size(), is(2)); // rollover the write index and one delete request for the index that's out of the @@ -326,12 +329,14 @@ public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() { Instant start3 = currentTime.plus(2, ChronoUnit.HOURS); Instant end3 = currentTime.plus(4, ChronoUnit.HOURS); + final var projectId = randomProjectIdOrDefault(); String dataStreamName = "logs_my-app_prod"; var clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + projectId, dataStreamName, List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3)) ); - Metadata.Builder builder = Metadata.builder(clusterState.metadata()); + ProjectMetadata.Builder builder = ProjectMetadata.builder(clusterState.metadata().getProject(projectId)); DataStream dataStream = builder.dataStream(dataStreamName); // Overwrite the data stream in the cluster state to set the lifecycle policy, with no retention policy (i.e. infinite retention). builder.put( @@ -341,7 +346,7 @@ public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() { .setLifecycle(DataStreamLifecycle.builder().build()) .build() ); - clusterState = ClusterState.builder(clusterState).metadata(builder).build(); + clusterState = ClusterState.builder(clusterState).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(clusterState); // There should be two client requests: one rollover, and one to update the merge policy settings. N.B. The merge policy settings @@ -367,7 +372,7 @@ public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() { public void testRetentionSkippedWhilstDownsamplingInProgress() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -378,14 +383,13 @@ public void testRetentionSkippedWhilstDownsamplingInProgress() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + final var project = builder.build(); { - Metadata metadata = state.metadata(); - Metadata.Builder metaBuilder = Metadata.builder(metadata); + ProjectMetadata.Builder newProjectBuilder = ProjectMetadata.builder(project); String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata indexMetadata = metadata.getProject().index(firstBackingIndex); + IndexMetadata indexMetadata = project.index(firstBackingIndex); IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); indexMetaBuilder.settings( Settings.builder() @@ -399,8 +403,8 @@ public void testRetentionSkippedWhilstDownsamplingInProgress() { LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, Map.of(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, String.valueOf(System.currentTimeMillis())) ); - metaBuilder.put(indexMetaBuilder); - state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build(); + newProjectBuilder.put(indexMetaBuilder); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(newProjectBuilder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(2)); // rollover the write index and delete the second generation @@ -414,17 +418,16 @@ public void testRetentionSkippedWhilstDownsamplingInProgress() { { // a lack of downsample status (i.e. the default `UNKNOWN`) must not prevent retention - Metadata metadata = state.metadata(); - Metadata.Builder metaBuilder = Metadata.builder(metadata); + ProjectMetadata.Builder newProjectBuilder = ProjectMetadata.builder(project); String firstBackingIndex = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - IndexMetadata indexMetadata = metadata.getProject().index(firstBackingIndex); + IndexMetadata indexMetadata = project.index(firstBackingIndex); IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); indexMetaBuilder.settings( Settings.builder().put(indexMetadata.getSettings()).putNull(IndexMetadata.INDEX_DOWNSAMPLE_STATUS_KEY) ); - metaBuilder.put(indexMetaBuilder); - state = ClusterState.builder(ClusterName.DEFAULT).metadata(metaBuilder).build(); + newProjectBuilder.put(indexMetaBuilder); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(newProjectBuilder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(3)); // rollover the write index and delete the other two generations @@ -445,7 +448,7 @@ public void testRetentionSkippedWhilstDownsamplingInProgress() { public void testIlmManagedIndicesAreSkipped() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -458,7 +461,7 @@ public void testIlmManagedIndicesAreSkipped() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.isEmpty(), is(true)); } @@ -466,7 +469,7 @@ public void testIlmManagedIndicesAreSkipped() { public void testDataStreamsWithoutLifecycleAreSkipped() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -479,7 +482,7 @@ public void testDataStreamsWithoutLifecycleAreSkipped() { ); builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.isEmpty(), is(true)); } @@ -487,7 +490,7 @@ public void testDataStreamsWithoutLifecycleAreSkipped() { public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -501,12 +504,11 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); // we are the master node nodesBuilder.masterNodeId(nodeId); - ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState previousState = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); // all backing indices are in the error store for (Index index : dataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore() - .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("bad")); + dataStreamLifecycleService.getErrorStore().recordError(builder.getId(), index.getName(), new NullPointerException("bad")); } Index writeIndex = dataStream.getWriteIndex(); // all indices but the write index are deleted @@ -514,28 +516,25 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { ClusterState.Builder newStateBuilder = ClusterState.builder(previousState); newStateBuilder.stateUUID(UUIDs.randomBase64UUID()); - Metadata.Builder metaBuilder = Metadata.builder(previousState.metadata()); + ProjectMetadata.Builder metaBuilder = ProjectMetadata.builder(previousState.metadata().getProject(builder.getId())); for (Index index : deletedIndices) { metaBuilder.remove(index.getName()); IndexGraveyard.Builder graveyardBuilder = IndexGraveyard.builder(metaBuilder.indexGraveyard()); graveyardBuilder.addTombstone(index); metaBuilder.indexGraveyard(graveyardBuilder.build()); } - newStateBuilder.metadata(metaBuilder); + newStateBuilder.putProjectMetadata(metaBuilder); ClusterState stateWithDeletedIndices = newStateBuilder.nodes(buildNodes(nodeId).masterNodeId(nodeId)).build(); setState(clusterService, stateWithDeletedIndices); dataStreamLifecycleService.run(stateWithDeletedIndices); for (Index deletedIndex : deletedIndices) { - assertThat( - dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, deletedIndex.getName()), - nullValue() - ); + assertThat(dataStreamLifecycleService.getErrorStore().getError(builder.getId(), deletedIndex.getName()), nullValue()); } // the value for the write index should still be in the error store assertThat( - dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, dataStream.getWriteIndex().getName()), + dataStreamLifecycleService.getErrorStore().getError(builder.getId(), dataStream.getWriteIndex().getName()), notNullValue() ); } @@ -543,7 +542,7 @@ public void testDeletedIndicesAreRemovedFromTheErrorStore() throws IOException { public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -554,35 +553,34 @@ public void testErrorStoreIsClearedOnBackingIndexBecomingUnmanaged() { ); // all backing indices are in the error store for (Index index : dataStream.getIndices()) { - dataStreamLifecycleService.getErrorStore() - .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("bad")); + dataStreamLifecycleService.getErrorStore().recordError(builder.getId(), index.getName(), new NullPointerException("bad")); } builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); - Metadata metadata = state.metadata(); - Metadata.Builder metaBuilder = Metadata.builder(metadata); + final var project = state.metadata().getProject(builder.getId()); + ProjectMetadata.Builder newBuilder = ProjectMetadata.builder(project); // update the backing indices to be ILM managed for (Index index : dataStream.getIndices()) { - IndexMetadata indexMetadata = metadata.getProject().index(index); + IndexMetadata indexMetadata = project.index(index); IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); indexMetaBuilder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy")); - metaBuilder.put(indexMetaBuilder.build(), true); + newBuilder.put(indexMetaBuilder.build(), true); } - ClusterState updatedState = ClusterState.builder(state).metadata(metaBuilder).build(); + ClusterState updatedState = ClusterState.builder(state).putProjectMetadata(newBuilder).build(); setState(clusterService, updatedState); dataStreamLifecycleService.run(updatedState); for (Index index : dataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), nullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(builder.getId(), index.getName()), nullValue()); } } public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { String ilmManagedDataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream ilmManagedDataStream = createDataStream( builder, ilmManagedDataStreamName, @@ -594,7 +592,7 @@ public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { // all backing indices are in the error store for (Index index : ilmManagedDataStream.getIndices()) { dataStreamLifecycleService.getErrorStore() - .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("will be ILM managed soon")); + .recordError(builder.getId(), index.getName(), new NullPointerException("will be ILM managed soon")); } String dataStreamWithBackingIndicesInErrorState = randomAlphaOfLength(15).toLowerCase(Locale.ROOT); DataStream dslManagedDataStream = createDataStream( @@ -608,32 +606,32 @@ public void testBackingIndicesFromMultipleDataStreamsInErrorStore() { // put all backing indices in the error store for (Index index : dslManagedDataStream.getIndices()) { dataStreamLifecycleService.getErrorStore() - .recordError(Metadata.DEFAULT_PROJECT_ID, index.getName(), new NullPointerException("dsl managed index")); + .recordError(builder.getId(), index.getName(), new NullPointerException("dsl managed index")); } builder.put(ilmManagedDataStream); builder.put(dslManagedDataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); - Metadata metadata = state.metadata(); - Metadata.Builder metaBuilder = Metadata.builder(metadata); + final var project = state.metadata().getProject(builder.getId()); + ProjectMetadata.Builder newBuilder = ProjectMetadata.builder(project); // update the backing indices to be ILM managed so they should be removed from the error store on the next DSL run for (Index index : ilmManagedDataStream.getIndices()) { - IndexMetadata indexMetadata = metadata.getProject().index(index); + IndexMetadata indexMetadata = project.index(index); IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(indexMetadata); indexMetaBuilder.settings(Settings.builder().put(indexMetadata.getSettings()).put(IndexMetadata.LIFECYCLE_NAME, "ILM_policy")); - metaBuilder.put(indexMetaBuilder.build(), true); + newBuilder.put(indexMetaBuilder.build(), true); } - ClusterState updatedState = ClusterState.builder(state).metadata(metaBuilder).build(); + ClusterState updatedState = ClusterState.builder(state).putProjectMetadata(newBuilder).build(); setState(clusterService, updatedState); dataStreamLifecycleService.run(updatedState); for (Index index : dslManagedDataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), notNullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(builder.getId(), index.getName()), notNullValue()); } for (Index index : ilmManagedDataStream.getIndices()) { - assertThat(dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, index.getName()), nullValue()); + assertThat(dataStreamLifecycleService.getErrorStore().getError(builder.getId(), index.getName()), nullValue()); } } @@ -647,7 +645,7 @@ public void testForceMerge() throws Exception { }; String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -663,42 +661,23 @@ public void testForceMerge() throws Exception { DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); // we are the master node nodesBuilder.masterNodeId(nodeId); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); setState(clusterService, state); dataStreamLifecycleService.run(clusterService.state()); // There are 3 backing indices. One gets rolled over. The other two get force merged: assertBusy(() -> { + final var project = clusterService.state().metadata().getProject(builder.getId()); + assertThat(project.index(dataStream.getIndices().get(0)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), notNullValue()); + assertThat(project.index(dataStream.getIndices().get(1)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), notNullValue()); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), - notNullValue() - ); - assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(1)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), - notNullValue() - ); - assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) + project.index(dataStream.getIndices().get(0)) .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY) .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY), notNullValue() ); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(1)) + project.index(dataStream.getIndices().get(1)) .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY) .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY), notNullValue() @@ -732,31 +711,26 @@ public void testForceMerge() throws Exception { MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L)); indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L)); IndexMetadata newIndexMetadata = indexMetaBuilder.build(); - builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true); - state = ClusterState.builder(clusterService.state()).metadata(builder).build(); + ProjectMetadata newProject = ProjectMetadata.builder(clusterService.state().metadata().getProject(builder.getId())) + .put(newIndexMetadata, true) + .build(); + state = ClusterState.builder(clusterService.state()).putProjectMetadata(newProject).build(); setState(clusterService, state); - DataStream dataStream2 = dataStream.addBackingIndex(clusterService.state().metadata().getProject(), newIndexMetadata.getIndex()); - builder = Metadata.builder(clusterService.state().metadata()); - builder.put(dataStream2); - state = ClusterState.builder(clusterService.state()).metadata(builder).build(); + DataStream dataStream2 = dataStream.addBackingIndex(newProject, newIndexMetadata.getIndex()); + newProject = ProjectMetadata.builder(newProject).put(dataStream2).build(); + state = ClusterState.builder(clusterService.state()).putProjectMetadata(newProject).build(); setState(clusterService, state); dataStreamLifecycleService.run(clusterService.state()); assertBusy(() -> { assertThat(clientSeenRequests.size(), is(4)); }); assertThat(((ForceMergeRequest) clientSeenRequests.get(3)).indices().length, is(1)); assertBusy(() -> { + final var retrievedProject = clusterService.state().metadata().getProject(builder.getId()); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream2.getIndices().get(2)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), + retrievedProject.index(dataStream2.getIndices().get(2)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), notNullValue() ); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream2.getIndices().get(2)) + retrievedProject.index(dataStream2.getIndices().get(2)) .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY) .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY), notNullValue() @@ -773,7 +747,7 @@ public void testForceMergeRetries() throws Exception { */ String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -789,7 +763,7 @@ public void testForceMergeRetries() throws Exception { DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); // we are the master node nodesBuilder.masterNodeId(nodeId); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); setState(clusterService, state); { @@ -809,14 +783,8 @@ public void testForceMergeRetries() throws Exception { */ assertBusy(() -> { assertThat(forceMergeFailedCount.get(), equalTo(2)); - assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), - nullValue() - ); + final var project = clusterService.state().metadata().getProject(builder.getId()); + assertThat(project.index(dataStream.getIndices().get(0)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), nullValue()); }); } @@ -841,14 +809,8 @@ public void testForceMergeRetries() throws Exception { dataStreamLifecycleService.run(clusterService.state()); assertBusy(() -> { assertThat(forceMergeFailedCount.get(), equalTo(2)); - assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), - nullValue() - ); + final var project = clusterService.state().metadata().getProject(builder.getId()); + assertThat(project.index(dataStream.getIndices().get(0)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), nullValue()); }); } @@ -867,14 +829,8 @@ public void testForceMergeRetries() throws Exception { dataStreamLifecycleService.run(clusterService.state()); assertBusy(() -> { assertThat(forceMergeFailedCount.get(), equalTo(2)); - assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), - nullValue() - ); + final var project = clusterService.state().metadata().getProject(builder.getId()); + assertThat(project.index(dataStream.getIndices().get(0)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), nullValue()); }); } @@ -890,36 +846,23 @@ public void testForceMergeRetries() throws Exception { * And this time we expect that it will actually run the forcemerge, and update the marker to complete: */ assertBusy(() -> { + final var project = clusterService.state().metadata().getProject(builder.getId()); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), + project.index(dataStream.getIndices().get(0)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), notNullValue() ); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(1)) - .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), + project.index(dataStream.getIndices().get(1)).getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY), notNullValue() ); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(0)) + project.index(dataStream.getIndices().get(0)) .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY) .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY), notNullValue() ); assertThat( - clusterService.state() - .metadata() - .getProject() - .index(dataStream.getIndices().get(1)) + project.index(dataStream.getIndices().get(1)) .getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY) .get(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY), notNullValue() @@ -952,20 +895,7 @@ public void testForceMergeDedup() throws Exception { * methods of ForceMergeRequests are interacting with the deduplicator as expected. */ String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); - DataStream dataStream = createDataStream( - builder, - dataStreamName, - numBackingIndices, - settings(IndexVersion.current()), - DataStreamLifecycle.builder().dataRetention(TimeValue.MAX_VALUE).build(), - now - ); - builder.put(dataStream); - IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder( - DataStream.getDefaultBackingIndexName(dataStreamName, numBackingIndices + 1) - ) + IndexMetadata.Builder indexMetaBuilder = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1)) .settings( settings(IndexVersion.current()).put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), ONE_HUNDRED_MB) .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), TARGET_MERGE_FACTOR_VALUE) @@ -976,13 +906,21 @@ public void testForceMergeDedup() throws Exception { MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L)); indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L)); IndexMetadata newIndexMetadata = indexMetaBuilder.build(); - builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true); - ClusterState state = ClusterState.builder(clusterService.state()).metadata(builder).build(); - setState(clusterService, state); - DataStream dataStream2 = dataStream.addBackingIndex(clusterService.state().metadata().getProject(), newIndexMetadata.getIndex()); - builder = Metadata.builder(clusterService.state().metadata()); - builder.put(dataStream2); - state = ClusterState.builder(clusterService.state()).metadata(builder).build(); + + DataStream dataStream = DataStreamTestHelper.newInstance( + dataStreamName, + // TODO: we have to add a write index that does not exist in the metadata to make this test pass. There is probably some value + // in checking that the deduplicator works, but this test depends on a broken/weird cluster state and it doesn't seem to have + // much value in its current state anyway. + List.of(newIndexMetadata.getIndex(), new Index(DataStream.getDefaultBackingIndexName(dataStreamName, 2), randomUUID())), + 1L, + null, + false, + DataStreamLifecycle.builder().dataRetention(TimeValue.MAX_VALUE).build() + ); + + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()).put(newIndexMetadata, true).put(dataStream); + ClusterState state = ClusterState.builder(clusterService.state()).putProjectMetadata(builder).build(); setState(clusterService, state); clientDelegate = (action, request, listener) -> { if (action.name().equals("indices:admin/forcemerge")) { @@ -1012,9 +950,11 @@ public void onFailure(Exception e) { failure.set(e); } }; + final var projectId = randomProjectIdOrDefault(); String targetIndex = randomAlphaOfLength(20); DataStreamLifecycleService.UpdateForceMergeCompleteTask task = new DataStreamLifecycleService.UpdateForceMergeCompleteTask( listener, + projectId, targetIndex, threadPool ); @@ -1024,9 +964,9 @@ public void onFailure(Exception e) { assertThat(failureCount.get(), equalTo(1)); assertThat(onResponseCount.get(), equalTo(0)); assertThat(failure.get(), equalTo(exception)); - ClusterState clusterState = createClusterState(targetIndex, null); + ClusterState clusterState = createClusterState(projectId, targetIndex, null); ClusterState newClusterState = task.execute(clusterState); - IndexMetadata indexMetadata = newClusterState.metadata().getProject().index(targetIndex); + IndexMetadata indexMetadata = newClusterState.metadata().getProject(projectId).index(targetIndex); assertThat(indexMetadata, notNullValue()); Map dataStreamLifecycleMetadata = indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); assertThat(dataStreamLifecycleMetadata, notNullValue()); @@ -1049,9 +989,9 @@ public void onFailure(Exception e) { preExistingDataStreamLifecycleCustomMetadataKey, preExistingDataStreamLifecycleCustomMetadataValue ); - ClusterState clusterState = createClusterState(targetIndex, preExistingDataStreamLifecycleCustomMetadata); + ClusterState clusterState = createClusterState(projectId, targetIndex, preExistingDataStreamLifecycleCustomMetadata); ClusterState newClusterState = task.execute(clusterState); - IndexMetadata indexMetadata = newClusterState.metadata().getProject().index(targetIndex); + IndexMetadata indexMetadata = newClusterState.metadata().getProject(projectId).index(targetIndex); Map dataStreamLifecycleMetadata = indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); assertThat(dataStreamLifecycleMetadata, notNullValue()); assertThat(dataStreamLifecycleMetadata.size(), equalTo(2)); @@ -1145,7 +1085,7 @@ public void testForceMergeRequestWrapperEqualsHashCode() { public void testMergePolicySettingsAreConfiguredBeforeForcemerge() throws Exception { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 3; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1160,7 +1100,7 @@ public void testMergePolicySettingsAreConfiguredBeforeForcemerge() throws Except DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); // we are the master node nodesBuilder.masterNodeId(nodeId); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); setState(clusterService, state); dataStreamLifecycleService.run(clusterService.state()); @@ -1205,16 +1145,15 @@ public void testMergePolicySettingsAreConfiguredBeforeForcemerge() throws Except MaxAgeCondition rolloverCondition = new MaxAgeCondition(TimeValue.timeValueMillis(now - 2000L)); indexMetaBuilder.putRolloverInfo(new RolloverInfo(dataStreamName, List.of(rolloverCondition), now - 2000L)); IndexMetadata newIndexMetadata = indexMetaBuilder.build(); - builder = Metadata.builder(clusterService.state().metadata()).put(newIndexMetadata, true); - state = ClusterState.builder(clusterService.state()).metadata(builder).build(); + builder = ProjectMetadata.builder(clusterService.state().metadata().getProject(builder.getId())).put(newIndexMetadata, true); + state = ClusterState.builder(clusterService.state()).putProjectMetadata(builder).build(); setState(clusterService, state); DataStream modifiedDataStream = dataStream.addBackingIndex( - clusterService.state().metadata().getProject(), + clusterService.state().metadata().getProject(builder.getId()), newIndexMetadata.getIndex() ); - builder = Metadata.builder(clusterService.state().metadata()); - builder.put(modifiedDataStream); - state = ClusterState.builder(clusterService.state()).metadata(builder).build(); + builder = ProjectMetadata.builder(clusterService.state().metadata().getProject(builder.getId())).put(modifiedDataStream); + state = ClusterState.builder(clusterService.state()).putProjectMetadata(builder).build(); setState(clusterService, state); dataStreamLifecycleService.run(clusterService.state()); assertBusy(() -> assertThat(clientSeenRequests.size(), is(4))); @@ -1224,7 +1163,8 @@ public void testMergePolicySettingsAreConfiguredBeforeForcemerge() throws Except public void testDownsampling() throws Exception { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 2; - Metadata.Builder builder = Metadata.builder(); + final var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1247,12 +1187,12 @@ public void testDownsampling() throws Exception { DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); // we are the master node nodesBuilder.masterNodeId(nodeId); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); setState(clusterService, state); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); - Index firstGenIndex = clusterService.state().metadata().getProject().index(firstGenIndexName).getIndex(); + Index firstGenIndex = clusterService.state().metadata().getProject(projectId).index(firstGenIndexName).getIndex(); Set affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( - clusterService.state(), + clusterService.state().projectState(projectId), dataStream, List.of(firstGenIndex) ); @@ -1264,38 +1204,42 @@ public void testDownsampling() throws Exception { { // we do the read-only bit ourselves as it's unit-testing - Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()); - IndexMetadata indexMetadata = metadataBuilder.getSafe(firstGenIndex); + ProjectMetadata.Builder newProjectBuilder = ProjectMetadata.builder(state.metadata().getProject(projectId)); + IndexMetadata indexMetadata = newProjectBuilder.getSafe(firstGenIndex); Settings updatedSettings = Settings.builder().put(indexMetadata.getSettings()).put(WRITE.settingName(), true).build(); - metadataBuilder.put( + newProjectBuilder.put( IndexMetadata.builder(indexMetadata).settings(updatedSettings).settingsVersion(indexMetadata.getSettingsVersion() + 1) ); ClusterBlock indexBlock = MetadataIndexStateService.createUUIDBasedBlock(WRITE.getBlock()); ClusterBlocks.Builder blocks = ClusterBlocks.builder(state.blocks()); - blocks.addIndexBlock(firstGenIndexName, indexBlock); + blocks.addIndexBlock(projectId, firstGenIndexName, indexBlock); - state = ClusterState.builder(state).blocks(blocks).metadata(metadataBuilder).build(); + state = ClusterState.builder(state).blocks(blocks).putProjectMetadata(newProjectBuilder).build(); setState(clusterService, state); } // on the next run downsampling should be triggered - affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); + affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( + clusterService.state().projectState(projectId), + dataStream, + List.of(firstGenIndex) + ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); assertThat(clientSeenRequests.size(), is(2)); assertThat(clientSeenRequests.get(1), instanceOf(DownsampleAction.Request.class)); String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName( DOWNSAMPLED_INDEX_PREFIX, - state.metadata().getProject().index(firstGenIndex), + state.metadata().getProject(projectId).index(firstGenIndex), new DateHistogramInterval("5m") ); { // let's simulate the in-progress downsampling - IndexMetadata firstGenMetadata = state.metadata().getProject().index(firstGenIndexName); - Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()); + IndexMetadata firstGenMetadata = state.metadata().getProject(projectId).index(firstGenIndexName); + ProjectMetadata.Builder newProjectBuilder = ProjectMetadata.builder(state.metadata().getProject(projectId)); - metadataBuilder.put( + newProjectBuilder.put( IndexMetadata.builder(downsampleIndexName) .settings( Settings.builder() @@ -1306,22 +1250,26 @@ public void testDownsampling() throws Exception { .numberOfReplicas(0) .numberOfShards(1) ); - state = ClusterState.builder(state).metadata(metadataBuilder).build(); + state = ClusterState.builder(state).putProjectMetadata(newProjectBuilder).build(); setState(clusterService, state); } // on the next run downsampling nothing should be triggered as downsampling is in progress (i.e. the STATUS is STARTED) - affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); + affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( + clusterService.state().projectState(projectId), + dataStream, + List.of(firstGenIndex) + ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); // still only 2 witnessed requests, nothing extra assertThat(clientSeenRequests.size(), is(2)); { // mark the downsample operation as complete - IndexMetadata firstGenMetadata = state.metadata().getProject().index(firstGenIndexName); - Metadata.Builder metadataBuilder = Metadata.builder(state.metadata()); + IndexMetadata firstGenMetadata = state.metadata().getProject(projectId).index(firstGenIndexName); + ProjectMetadata.Builder newProjectBuilder = ProjectMetadata.builder(state.metadata().getProject(projectId)); - metadataBuilder.put( + newProjectBuilder.put( IndexMetadata.builder(downsampleIndexName) .settings( Settings.builder() @@ -1333,23 +1281,27 @@ public void testDownsampling() throws Exception { .numberOfReplicas(0) .numberOfShards(1) ); - state = ClusterState.builder(state).metadata(metadataBuilder).build(); + state = ClusterState.builder(state).putProjectMetadata(newProjectBuilder).build(); setState(clusterService, state); } // on this run, as downsampling is complete we expect to trigger the {@link // org.elasticsearch.datastreams.lifecycle.downsampling.DeleteSourceAndAddDownsampleToDS} // cluster service task and delete the source index whilst adding the downsample index in the data stream - affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); + affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( + clusterService.state().projectState(projectId), + dataStream, + List.of(firstGenIndex) + ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); assertBusy(() -> { ClusterState newState = clusterService.state(); - IndexAbstraction downsample = newState.metadata().getProject().getIndicesLookup().get(downsampleIndexName); + IndexAbstraction downsample = newState.metadata().getProject(projectId).getIndicesLookup().get(downsampleIndexName); // the downsample index must be part of the data stream assertThat(downsample.getParentDataStream(), is(notNullValue())); assertThat(downsample.getParentDataStream().getName(), is(dataStreamName)); // the source index was deleted - IndexAbstraction sourceIndexAbstraction = newState.metadata().getProject().getIndicesLookup().get(firstGenIndexName); + IndexAbstraction sourceIndexAbstraction = newState.metadata().getProject(projectId).getIndicesLookup().get(firstGenIndexName); assertThat(sourceIndexAbstraction, is(nullValue())); // no further requests should be triggered @@ -1360,7 +1312,8 @@ public void testDownsampling() throws Exception { public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exception { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 2; - Metadata.Builder builder = Metadata.builder(); + final var projectId = randomProjectIdOrDefault(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1392,27 +1345,32 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc ClusterBlock indexBlock = MetadataIndexStateService.createUUIDBasedBlock(WRITE.getBlock()); ClusterBlocks.Builder blocks = ClusterBlocks.builder(); - blocks.addIndexBlock(firstGenIndexName, indexBlock); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).blocks(blocks).metadata(builder).nodes(nodesBuilder).build(); + blocks.addIndexBlock(projectId, firstGenIndexName, indexBlock); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT) + .blocks(blocks) + .putProjectMetadata(builder) + .nodes(nodesBuilder) + .build(); // add another index to the cluster state that clashes with the expected downsample index name for the configured round String downsampleIndexName = DownsampleConfig.generateDownsampleIndexName( DOWNSAMPLED_INDEX_PREFIX, - state.metadata().getProject().index(firstGenIndexName), + state.metadata().getProject(projectId).index(firstGenIndexName), new DateHistogramInterval("5m") ); - Metadata.Builder newMetadata = Metadata.builder(state.metadata()) + ProjectMetadata.Builder newProject = ProjectMetadata.builder(state.metadata().getProject(projectId)) .put( IndexMetadata.builder(downsampleIndexName).settings(settings(IndexVersion.current())).numberOfReplicas(0).numberOfShards(1) ); - state = ClusterState.builder(state).metadata(newMetadata).nodes(nodesBuilder).build(); + state = ClusterState.builder(state).putProjectMetadata(newProject).nodes(nodesBuilder).build(); setState(clusterService, state); - Index firstGenIndex = state.metadata().getProject().index(firstGenIndexName).getIndex(); - dataStreamLifecycleService.maybeExecuteDownsampling(clusterService.state(), dataStream, List.of(firstGenIndex)); + final var projectState = clusterService.state().projectState(projectId); + Index firstGenIndex = projectState.metadata().index(firstGenIndexName).getIndex(); + dataStreamLifecycleService.maybeExecuteDownsampling(projectState, dataStream, List.of(firstGenIndex)); assertThat(clientSeenRequests.size(), is(0)); - ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(Metadata.DEFAULT_PROJECT_ID, firstGenIndexName); + ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(projectId, firstGenIndexName); assertThat(error, notNullValue()); assertThat(error.error(), containsString("resource_already_exists_exception")); } @@ -1427,19 +1385,22 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { Instant start3 = currentTime.plus(2, ChronoUnit.HOURS); Instant end3 = currentTime.plus(4, ChronoUnit.HOURS); + final var projectId = randomProjectIdOrDefault(); String dataStreamName = "logs_my-app_prod"; var clusterState = DataStreamTestHelper.getClusterStateWithDataStream( + projectId, dataStreamName, List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3)) ); - DataStream dataStream = clusterState.getMetadata().getProject().dataStreams().get(dataStreamName); + final var project = clusterState.metadata().getProject(projectId); + DataStream dataStream = project.dataStreams().get(dataStreamName); { // test for an index for which `now` is outside its time bounds Index firstGenIndex = dataStream.getIndices().get(0); Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( - clusterState.metadata(), // the end_time for the first generation has lapsed + project, List.of(firstGenIndex), currentTime::toEpochMilli ); @@ -1448,8 +1409,8 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { { Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( - clusterState.metadata(), // the end_time for the first generation has lapsed, but the other 2 generations are still within bounds + project, dataStream.getIndices(), currentTime::toEpochMilli ); @@ -1463,10 +1424,10 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { .settings(indexSettings(1, 1).put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersion.current())) .build(); - Metadata newMetadata = Metadata.builder(clusterState.metadata()).put(indexMeta, true).build(); + ProjectMetadata newProject = ProjectMetadata.builder(project).put(indexMeta, true).build(); Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( - newMetadata, + newProject, List.of(indexMeta.getIndex()), currentTime::toEpochMilli ); @@ -1488,7 +1449,8 @@ public void testTrackingTimeStats() { errorStore, mock(AllocationService.class), new DataStreamLifecycleHealthInfoPublisher(Settings.EMPTY, getTransportRequestsRecordingClient(), clusterService, errorStore), - globalRetentionSettings + globalRetentionSettings, + TestProjectResolvers.mustExecuteFirst() ); assertThat(service.getLastRunDuration(), is(nullValue())); assertThat(service.getTimeBetweenStarts(), is(nullValue())); @@ -1513,7 +1475,7 @@ public void testTargetIndices() { case 2 -> DataStreamOptions.FAILURE_STORE_DISABLED; default -> throw new IllegalStateException("Unexpected value: " + mutationBranch); }; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1524,21 +1486,16 @@ public void testTargetIndices() { now ).copy().setDataStreamOptions(dataStreamOptions).build(); // failure store is managed even when disabled builder.put(dataStream); - Metadata metadata = builder.build(); + ProjectMetadata project = builder.build(); Set indicesToExclude = Set.of(dataStream.getIndices().get(0), dataStream.getFailureIndices().get(0)); List targetBackingIndicesOnly = DataStreamLifecycleService.getTargetIndices( dataStream, indicesToExclude, - metadata.getProject()::index, + project::index, false ); assertThat(targetBackingIndicesOnly, equalTo(dataStream.getIndices().subList(1, 3))); - List targetIndices = DataStreamLifecycleService.getTargetIndices( - dataStream, - indicesToExclude, - metadata.getProject()::index, - true - ); + List targetIndices = DataStreamLifecycleService.getTargetIndices(dataStream, indicesToExclude, project::index, true); assertThat( targetIndices, equalTo(List.of(dataStream.getIndices().get(1), dataStream.getIndices().get(2), dataStream.getFailureIndices().get(1))) @@ -1548,7 +1505,7 @@ public void testTargetIndices() { public void testFailureStoreIsManagedEvenWhenDisabled() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); int numBackingIndices = 1; - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(randomProjectIdOrDefault()); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1560,7 +1517,7 @@ public void testFailureStoreIsManagedEvenWhenDisabled() { ).copy().setDataStreamOptions(DataStreamOptions.FAILURE_STORE_DISABLED).build(); // failure store is managed even when disabled builder.put(dataStream); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(state); assertThat(clientSeenRequests.size(), is(3)); @@ -1578,40 +1535,46 @@ public void testFailureStoreIsManagedEvenWhenDisabled() { public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ClusterState state = downsampleSetup(dataStreamName, SUCCESS); - DataStream dataStream = state.metadata().getProject().dataStreams().get(dataStreamName); + final var projectId = randomProjectIdOrDefault(); + ClusterState state = downsampleSetup(projectId, dataStreamName, SUCCESS); + final var project = state.metadata().getProject(projectId); + DataStream dataStream = project.dataStreams().get(dataStreamName); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); - assertThat(indicesToBeRemoved, contains(state.getMetadata().getProject().index(firstGenIndexName).getIndex())); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); + assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex())); } public void testMaybeExecuteRetentionDownsampledIndexInProgress() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ClusterState state = downsampleSetup(dataStreamName, STARTED); - DataStream dataStream = state.metadata().getProject().dataStreams().get(dataStreamName); + final var projectId = randomProjectIdOrDefault(); + ClusterState state = downsampleSetup(projectId, dataStreamName, STARTED); + final var project = state.metadata().getProject(projectId); + DataStream dataStream = project.dataStreams().get(dataStreamName); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); assertThat(indicesToBeRemoved, empty()); } public void testMaybeExecuteRetentionDownsampledUnknown() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); - ClusterState state = downsampleSetup(dataStreamName, UNKNOWN); - DataStream dataStream = state.metadata().getProject().dataStreams().get(dataStreamName); + final var projectId = randomProjectIdOrDefault(); + ClusterState state = downsampleSetup(projectId, dataStreamName, UNKNOWN); + final var project = state.metadata().getProject(projectId); + DataStream dataStream = project.dataStreams().get(dataStreamName); String firstGenIndexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(clusterService.state(), dataStream, Set.of()); - assertThat(indicesToBeRemoved, contains(state.getMetadata().getProject().index(firstGenIndexName).getIndex())); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); + assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex())); } - private ClusterState downsampleSetup(String dataStreamName, IndexMetadata.DownsampleTaskStatus status) { + private ClusterState downsampleSetup(ProjectId projectId, String dataStreamName, IndexMetadata.DownsampleTaskStatus status) { // Base setup: - Metadata.Builder builder = Metadata.builder(); + ProjectMetadata.Builder builder = ProjectMetadata.builder(projectId); DataStream dataStream = createDataStream( builder, dataStreamName, @@ -1639,7 +1602,7 @@ private ClusterState downsampleSetup(String dataStreamName, IndexMetadata.Downsa String nodeId = "localNode"; DiscoveryNodes.Builder nodesBuilder = buildNodes(nodeId); nodesBuilder.masterNodeId(nodeId); - ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metadata(builder).nodes(nodesBuilder).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(builder).nodes(nodesBuilder).build(); setState(clusterService, state); return state; } @@ -1648,10 +1611,7 @@ private ClusterState downsampleSetup(String dataStreamName, IndexMetadata.Downsa * Creates a test cluster state with the given indexName. If customDataStreamLifecycleMetadata is not null, it is added as the value * of the index's custom metadata named "data_stream_lifecycle". */ - private ClusterState createClusterState(String indexName, Map customDataStreamLifecycleMetadata) { - var routingTableBuilder = RoutingTable.builder(); - Metadata.Builder metadataBuilder = Metadata.builder(); - Map indices = new HashMap<>(); + private ClusterState createClusterState(ProjectId projectId, String indexName, Map customDataStreamLifecycleMetadata) { IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(indexName) .version(randomLong()) .settings( @@ -1663,11 +1623,8 @@ private ClusterState createClusterState(String indexName, Map cu if (customDataStreamLifecycleMetadata != null) { indexMetadataBuilder.putCustom(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, customDataStreamLifecycleMetadata); } - indices.put(indexName, indexMetadataBuilder.build()); - return ClusterState.builder(new ClusterName("test-cluster")) - .routingTable(routingTableBuilder.build()) - .metadata(metadataBuilder.indices(indices).build()) - .build(); + final var project = ProjectMetadata.builder(projectId).put(indexMetadataBuilder.build(), false).build(); + return ClusterState.builder(ClusterName.DEFAULT).putProjectMetadata(project).build(); } private static DataStreamLifecycleService.ForceMergeRequestWrapper copyForceMergeRequestWrapperRequest( diff --git a/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java b/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java index cb5a499c7df1f..d97c7bfb29aab 100644 --- a/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java +++ b/server/src/main/java/org/elasticsearch/cluster/project/ProjectResolver.java @@ -9,6 +9,12 @@ package org.elasticsearch.cluster.project; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.client.internal.FilterClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.Metadata; @@ -77,6 +83,27 @@ default Collection getProjectIds(ClusterState clusterState) { */ void executeOnProject(ProjectId projectId, CheckedRunnable body) throws E; + /** + * Returns a client that executes every request in the context of the given project. + */ + default Client projectClient(Client baseClient, ProjectId projectId) { + // We only take the shortcut when the given project ID matches the "current" project ID. If it doesn't, we'll let #executeOnProject + // take care of error handling. + if (supportsMultipleProjects() == false && projectId.equals(getProjectId())) { + return baseClient; + } + return new FilterClient(baseClient) { + @Override + protected void doExecute( + ActionType action, + Request request, + ActionListener listener + ) { + executeOnProject(projectId, () -> super.doExecute(action, request, listener)); + } + }; + } + /** * Returns {@code false} if the cluster runs in a setup that always expects only a single default project (see also * {@link Metadata#DEFAULT_PROJECT_ID}).