diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java index 10ebb25563355..f4f1954552fc2 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/DataStreamsPlugin.java @@ -47,6 +47,7 @@ import org.elasticsearch.datastreams.action.TransportPromoteDataStreamAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamMappingsAction; import org.elasticsearch.datastreams.action.TransportUpdateDataStreamSettingsAction; +import org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleService; import org.elasticsearch.datastreams.lifecycle.action.DeleteDataStreamLifecycleAction; @@ -86,6 +87,7 @@ import org.elasticsearch.health.HealthIndicatorService; import org.elasticsearch.index.IndexSettingProvider; import org.elasticsearch.plugins.ActionPlugin; +import org.elasticsearch.plugins.ExtensiblePlugin; import org.elasticsearch.plugins.HealthPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; @@ -102,7 +104,7 @@ import static org.elasticsearch.cluster.metadata.DataStreamLifecycle.DATA_STREAM_LIFECYCLE_ORIGIN; -public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin { +public class DataStreamsPlugin extends Plugin implements ActionPlugin, HealthPlugin, ExtensiblePlugin { public static final Setting TIME_SERIES_POLL_INTERVAL = Setting.timeSetting( "time_series.poll_interval", @@ -153,6 +155,7 @@ public static TimeValue getLookAheadTime(Settings settings) { private final SetOnce dataStreamLifecycleErrorsPublisher = new SetOnce<>(); private final SetOnce dataStreamLifecycleHealthIndicatorService = new SetOnce<>(); private final Settings settings; + private AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions; public DataStreamsPlugin(Settings settings) { this.settings = settings; @@ -220,7 +223,8 @@ public Collection createComponents(PluginServices services) { errorStoreInitialisationService.get(), services.allocationService(), dataStreamLifecycleErrorsPublisher.get(), - services.dataStreamGlobalRetentionSettings() + services.dataStreamGlobalRetentionSettings(), + additionalDataStreamLifecycleActions ) ); dataLifecycleInitialisationService.get().init(); @@ -314,4 +318,15 @@ public void close() throws IOException { public Collection getHealthIndicatorServices() { return List.of(dataStreamLifecycleHealthIndicatorService.get()); } + + @Override + public void loadExtensions(ExtensionLoader loader) { + List dataStreamLifecycleActions = loader.loadExtensions( + AdditionalDataStreamLifecycleActions.class + ); + assert dataStreamLifecycleActions.size() <= 1; + if (dataStreamLifecycleActions.isEmpty() == false) { + this.additionalDataStreamLifecycleActions = dataStreamLifecycleActions.getFirst(); + } + } } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/AdditionalDataStreamLifecycleActions.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/AdditionalDataStreamLifecycleActions.java new file mode 100644 index 0000000000000..199bcfde8e87c --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/AdditionalDataStreamLifecycleActions.java @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import java.util.List; + +public interface AdditionalDataStreamLifecycleActions { + List getDataStreamLifecycleActions(); +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleAction.java new file mode 100644 index 0000000000000..f8961a2762b0a --- /dev/null +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleAction.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.datastreams.lifecycle; + +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.index.Index; + +import java.util.Set; + +@FunctionalInterface +public interface DataStreamLifecycleAction { + /** + * This takes some action on the data stream. The action is expected to be fast, or run asynchronously. It returns a set of indices + * that ought to be ignored by subsequent actions in the current pass. + * + * @param projectState The current ProjectState + * @param dataStream The data stream to be acted upon + * @param indicesToExcludeForRemainingRun A set of indices that ought to be ignored by this action. + */ + Set apply( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ); +} diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java index 5a998f43d0103..e059dc1565cc8 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java @@ -176,10 +176,11 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private volatile RolloverConfiguration rolloverConfiguration; private SchedulerEngine.Job scheduledJob; private final SetOnce scheduler = new SetOnce<>(); - private final MasterServiceTaskQueue forceMergeClusterStateUpdateTaskQueue; + private final MasterServiceTaskQueue dlmCustomMetadataClusterStateUpdateTaskQueue; private final MasterServiceTaskQueue swapSourceWithDownsampleIndexQueue; private volatile ByteSizeValue targetMergePolicyFloorSegment; private volatile int targetMergePolicyFactor; + private final AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions; /** * The number of retries for a particular index and error after which DSL will emmit a signal (e.g. log statement) */ @@ -192,16 +193,18 @@ public class DataStreamLifecycleService implements ClusterStateListener, Closeab private volatile Long lastRunDuration = null; private volatile Long timeBetweenStarts = null; - private static final SimpleBatchedExecutor FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR = - new SimpleBatchedExecutor<>() { + private static final SimpleBatchedExecutor< + UpdateDataStreamLifecycleCustomMetadataTask, + Void> DLM_CUSTOM_METADATA_STATE_UPDATE_TASK_EXECUTOR = new SimpleBatchedExecutor<>() { @Override - public Tuple executeTask(UpdateForceMergeCompleteTask task, ClusterState clusterState) throws Exception { + public Tuple executeTask(UpdateDataStreamLifecycleCustomMetadataTask task, ClusterState clusterState) + throws Exception { return Tuple.tuple(task.execute(clusterState), null); } @Override - public void taskSucceeded(UpdateForceMergeCompleteTask task, Void unused) { - logger.trace("Updated cluster state for force merge of index [{}]", task.targetIndex); + public void taskSucceeded(UpdateDataStreamLifecycleCustomMetadataTask task, Void unused) { + logger.trace("Updated cluster state for {} of index [{}]", task.key, task.targetIndex); task.listener.onResponse(null); } }; @@ -216,7 +219,8 @@ public DataStreamLifecycleService( DataStreamLifecycleErrorStore errorStore, AllocationService allocationService, DataStreamLifecycleHealthInfoPublisher dataStreamLifecycleHealthInfoPublisher, - DataStreamGlobalRetentionSettings globalRetentionSettings + DataStreamGlobalRetentionSettings globalRetentionSettings, + @Nullable AdditionalDataStreamLifecycleActions additionalDataStreamLifecycleActions ) { this.settings = settings; this.client = client; @@ -235,10 +239,10 @@ public DataStreamLifecycleService( this.signallingErrorRetryInterval = DATA_STREAM_SIGNALLING_ERROR_RETRY_INTERVAL_SETTING.get(settings); this.rolloverConfiguration = clusterService.getClusterSettings() .get(DataStreamLifecycle.CLUSTER_LIFECYCLE_DEFAULT_ROLLOVER_SETTING); - this.forceMergeClusterStateUpdateTaskQueue = clusterService.createTaskQueue( + this.dlmCustomMetadataClusterStateUpdateTaskQueue = clusterService.createTaskQueue( "data-stream-lifecycle-forcemerge-state-update", Priority.LOW, - FORCE_MERGE_STATE_UPDATE_TASK_EXECUTOR + DLM_CUSTOM_METADATA_STATE_UPDATE_TASK_EXECUTOR ); this.swapSourceWithDownsampleIndexQueue = clusterService.createTaskQueue( "data-stream-lifecycle-swap-source-with-downsample", @@ -246,6 +250,7 @@ public DataStreamLifecycleService( new DeleteSourceAndAddDownsampleIndexExecutor(allocationService) ); this.dslHealthInfoPublisher = dataStreamLifecycleHealthInfoPublisher; + this.additionalDataStreamLifecycleActions = additionalDataStreamLifecycleActions; } /** @@ -358,6 +363,13 @@ private void run(ProjectState projectState) { final var project = projectState.metadata(); int affectedIndices = 0; int affectedDataStreams = 0; + List actions = List.of( + this::maybeExecuteRollover, + this::timeSeriesIndicesStillWithinTimeBounds, + this::maybeExecuteRetention, + this::maybeExecuteForceMerge, + this::maybeExecuteDownsampling + ); for (DataStream dataStream : project.dataStreams().values()) { clearErrorStoreForUnmanagedIndices(project, dataStream); var dataLifecycleEnabled = dataStream.getDataLifecycle() != null && dataStream.getDataLifecycle().enabled(); @@ -366,84 +378,20 @@ private void run(ProjectState projectState) { if (dataLifecycleEnabled == false && failuresLifecycleEnabled == false) { continue; } - - // Retrieve the effective retention to ensure the same retention is used for this data stream - // through all operations. - var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false); - var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true); - // the following indices should not be considered for the remainder of this service run, for various reasons. Set indicesToExcludeForRemainingRun = new HashSet<>(); - - // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, - // depending on rollover criteria, for this reason we exclude them for the remaining run. - indicesToExcludeForRemainingRun.add(maybeExecuteRollover(project, dataStream, dataRetention, false)); - Index failureStoreWriteIndex = maybeExecuteRollover(project, dataStream, failuresRetention, true); - if (failureStoreWriteIndex != null) { - indicesToExcludeForRemainingRun.add(failureStoreWriteIndex); - } - - // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be - // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes - indicesToExcludeForRemainingRun.addAll( - timeSeriesIndicesStillWithinTimeBounds( - project, - getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false), - nowSupplier - ) - ); - - try { - indicesToExcludeForRemainingRun.addAll( - maybeExecuteRetention(project, dataStream, dataRetention, failuresRetention, indicesToExcludeForRemainingRun) - ); - } catch (Exception e) { - // individual index errors would be reported via the API action listener for every delete call - // we could potentially record errors at a data stream level and expose it via the _data_stream API? - logger.error( - () -> String.format( - Locale.ROOT, - "Data stream lifecycle failed to execute retention for data stream [%s]", - dataStream.getName() - ), - e - ); - } - - try { + for (DataStreamLifecycleAction action : actions) { indicesToExcludeForRemainingRun.addAll( - maybeExecuteForceMerge(project, getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true)) - ); - } catch (Exception e) { - logger.error( - () -> String.format( - Locale.ROOT, - "Data stream lifecycle failed to execute force merge for data stream [%s]", - dataStream.getName() - ), - e + action.apply(projectState, dataStream, indicesToExcludeForRemainingRun, client, errorStore) ); } - - try { - indicesToExcludeForRemainingRun.addAll( - maybeExecuteDownsampling( - projectState, - dataStream, - getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false) - ) - ); - } catch (Exception e) { - logger.error( - () -> String.format( - Locale.ROOT, - "Data stream lifecycle failed to execute downsampling for data stream [%s]", - dataStream.getName() - ), - e - ); + if (additionalDataStreamLifecycleActions != null) { + for (DataStreamLifecycleAction action : additionalDataStreamLifecycleActions.getDataStreamLifecycleActions()) { + indicesToExcludeForRemainingRun.addAll( + action.apply(projectState, dataStream, indicesToExcludeForRemainingRun, client, errorStore) + ); + } } - affectedIndices += indicesToExcludeForRemainingRun.size(); affectedDataStreams++; } @@ -457,8 +405,27 @@ private void run(ProjectState projectState) { ); } + private Set timeSeriesIndicesStillWithinTimeBounds( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + return timeSeriesIndicesStillWithinTimeBounds(projectState, dataStream, indicesToExcludeForRemainingRun, nowSupplier); + } + // visible for testing - static Set timeSeriesIndicesStillWithinTimeBounds(ProjectMetadata project, List targetIndices, LongSupplier nowSupplier) { + static Set timeSeriesIndicesStillWithinTimeBounds( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + LongSupplier nowSupplier + ) { + // tsds indices that are still within their time bounds (i.e. now < time_series.end_time) - we don't want these indices to be + // deleted, forcemerged, or downsampled as they're still expected to receive large amounts of writes + ProjectMetadata project = projectState.metadata(); + List targetIndices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, false); Set tsIndicesWithinBounds = new HashSet<>(); for (Index index : targetIndices) { IndexMetadata backingIndex = project.index(index); @@ -496,41 +463,69 @@ static Set timeSeriesIndicesStillWithinTimeBounds(ProjectMetadata project * replacing an index in the data stream, deleting a source index, or downsampling itself) so these indices can be skipped in case * there are other operations to be executed by the data stream lifecycle after downsampling. */ - Set maybeExecuteDownsampling(ProjectState projectState, DataStream dataStream, List targetIndices) { + Set maybeExecuteDownsampling( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { Set affectedIndices = new HashSet<>(); - final var project = projectState.metadata(); - for (Index index : targetIndices) { - IndexMetadata backingIndexMeta = project.index(index); - assert backingIndexMeta != null : "the data stream backing indices must exist"; - List downsamplingRounds = dataStream.getDownsamplingRoundsFor( - index, - project::index, - nowSupplier + try { + List targetIndices = getTargetIndices( + dataStream, + indicesToExcludeForRemainingRun, + projectState.metadata()::index, + false ); - if (downsamplingRounds.isEmpty()) { - continue; - } + final var project = projectState.metadata(); + for (Index index : targetIndices) { + IndexMetadata backingIndexMeta = project.index(index); + assert backingIndexMeta != null : "the data stream backing indices must exist"; + List downsamplingRounds = dataStream.getDownsamplingRoundsFor( + index, + project::index, + nowSupplier + ); + if (downsamplingRounds.isEmpty()) { + continue; + } - String indexName = index.getName(); - String downsamplingSourceIndex = IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.get(backingIndexMeta.getSettings()); + String indexName = index.getName(); + String downsamplingSourceIndex = IndexMetadata.INDEX_DOWNSAMPLE_SOURCE_NAME.get(backingIndexMeta.getSettings()); - // if the current index is not a downsample we want to mark the index as read-only before proceeding with downsampling - if (org.elasticsearch.common.Strings.hasText(downsamplingSourceIndex) == false - && projectState.blocks().indexBlocked(project.id(), ClusterBlockLevel.WRITE, indexName) == false) { - affectedIndices.add(index); - addIndexBlockOnce(project.id(), indexName); - } else { - // we're not performing any operation for this index which means that it: - // - has matching downsample rounds - // - is read-only - // So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round - var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod(); - affectedIndices.addAll( - waitForInProgressOrTriggerDownsampling(dataStream, backingIndexMeta, downsamplingRounds, downsamplingMethod, project) - ); + // if the current index is not a downsample we want to mark the index as read-only before proceeding with downsampling + if (org.elasticsearch.common.Strings.hasText(downsamplingSourceIndex) == false + && projectState.blocks().indexBlocked(project.id(), ClusterBlockLevel.WRITE, indexName) == false) { + affectedIndices.add(index); + addIndexBlockOnce(project.id(), indexName); + } else { + // we're not performing any operation for this index which means that it: + // - has matching downsample rounds + // - is read-only + // So let's wait for an in-progress downsampling operation to succeed or trigger the last matching round + var downsamplingMethod = dataStream.getDataLifecycle().downsamplingMethod(); + affectedIndices.addAll( + waitForInProgressOrTriggerDownsampling( + dataStream, + backingIndexMeta, + downsamplingRounds, + downsamplingMethod, + project + ) + ); + } } + } catch (Exception e) { + logger.error( + () -> String.format( + Locale.ROOT, + "Data stream lifecycle failed to execute downsampling for data stream [%s]", + dataStream.getName() + ), + e + ); } - return affectedIndices; } @@ -861,6 +856,26 @@ private void clearErrorStoreForUnmanagedIndices(ProjectMetadata project, DataStr } } + private Set maybeExecuteRollover( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false); + var failuresRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true); + // These are the pre-rollover write indices. They may or may not be the write index after maybeExecuteRollover has executed, + // depending on rollover criteria, for this reason we exclude them for the remaining run. + Set indicesToExclude = new HashSet<>(); + indicesToExclude.add(maybeExecuteRollover(projectState.metadata(), dataStream, dataRetention, false)); + Index failureStoreWriteIndex = maybeExecuteRollover(projectState.metadata(), dataStream, failuresRetention, true); + if (failureStoreWriteIndex != null) { + indicesToExclude.add(failureStoreWriteIndex); + } + return indicesToExclude; + } + @Nullable private Index maybeExecuteRollover( ProjectMetadata project, @@ -925,83 +940,99 @@ private Index maybeExecuteRollover( * This method sends requests to delete any indices in the datastream that exceed its retention policy. It returns the set of indices * it has sent delete requests for. * - * @param project The project metadata from which to get index metadata + * @param projectState The project state from which to get index metadata * @param dataStream The data stream * @param indicesToExcludeForRemainingRun Indices to exclude from retention even if it would be time for them to be deleted * @return The set of indices that delete requests have been sent for */ Set maybeExecuteRetention( - ProjectMetadata project, + ProjectState projectState, DataStream dataStream, - TimeValue dataRetention, - TimeValue failureRetention, - Set indicesToExcludeForRemainingRun + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore ) { - if (dataRetention == null && failureRetention == null) { - return Set.of(); - } - List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention( - project::index, - nowSupplier, - dataRetention, - false - ); - List failureIndicesOlderThanRetention = dataStream.getIndicesPastRetention( - project::index, - nowSupplier, - failureRetention, - true - ); - if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) { - return Set.of(); - } Set indicesToBeRemoved = new HashSet<>(); - if (backingIndicesOlderThanRetention.isEmpty() == false) { - assert dataStream.getDataLifecycle() != null : "data stream should have data lifecycle if we have 'old' indices"; - for (Index index : backingIndicesOlderThanRetention) { - if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata backingIndex = project.index(index); - assert backingIndex != null : "the data stream backing indices must exist"; - - IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); - // we don't want to delete the source index if they have an in-progress downsampling operation because the - // target downsample index will remain in the system as a standalone index - if (downsampleStatus == STARTED) { - // there's an opportunity here to cancel downsampling and delete the source index now - logger.trace( - "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " - + "because there's a downsampling operation currently in progress for this index. Current downsampling " - + "status is [{}]. When downsampling completes, DSL will delete this index.", - index.getName(), - dataRetention, - downsampleStatus - ); - } else { - // UNKNOWN is the default value, and has no real use. So index should be deleted - // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + try { + var dataRetention = getEffectiveRetention(dataStream, globalRetentionSettings, false); + var failureRetention = getEffectiveRetention(dataStream, globalRetentionSettings, true); + if (dataRetention == null && failureRetention == null) { + return Set.of(); + } + ProjectMetadata project = projectState.metadata(); + List backingIndicesOlderThanRetention = dataStream.getIndicesPastRetention( + project::index, + nowSupplier, + dataRetention, + false + ); + List failureIndicesOlderThanRetention = dataStream.getIndicesPastRetention( + project::index, + nowSupplier, + failureRetention, + true + ); + if (backingIndicesOlderThanRetention.isEmpty() && failureIndicesOlderThanRetention.isEmpty()) { + return Set.of(); + } + if (backingIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getDataLifecycle() != null : "data stream should have data lifecycle if we have 'old' indices"; + for (Index index : backingIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata backingIndex = project.index(index); + assert backingIndex != null : "the data stream backing indices must exist"; + + IndexMetadata.DownsampleTaskStatus downsampleStatus = INDEX_DOWNSAMPLE_STATUS.get(backingIndex.getSettings()); + // we don't want to delete the source index if they have an in-progress downsampling operation because the + // target downsample index will remain in the system as a standalone index + if (downsampleStatus == STARTED) { + // there's an opportunity here to cancel downsampling and delete the source index now + logger.trace( + "Data stream lifecycle skips deleting index [{}] even though its retention period [{}] has lapsed " + + "because there's a downsampling operation currently in progress for this index. Current downsampling " + + "status is [{}]. When downsampling completes, DSL will delete this index.", + index.getName(), + dataRetention, + downsampleStatus + ); + } else { + // UNKNOWN is the default value, and has no real use. So index should be deleted + // SUCCESS meaning downsampling completed successfully and there is nothing in progress, so we can also delete + indicesToBeRemoved.add(index); + + // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) + // let's start simple and reevaluate + String indexName = backingIndex.getIndex().getName(); + deleteIndexOnce(project.id(), indexName, "the lapsed [" + dataRetention + "] retention period"); + } + } + } + } + if (failureIndicesOlderThanRetention.isEmpty() == false) { + assert dataStream.getFailuresLifecycle() != null : "data stream should have failures lifecycle if we have 'old' indices"; + for (Index index : failureIndicesOlderThanRetention) { + if (indicesToExcludeForRemainingRun.contains(index) == false) { + IndexMetadata failureIndex = project.index(index); + assert failureIndex != null : "the data stream failure indices must exist"; indicesToBeRemoved.add(index); - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) // let's start simple and reevaluate - String indexName = backingIndex.getIndex().getName(); - deleteIndexOnce(project.id(), indexName, "the lapsed [" + dataRetention + "] retention period"); + String indexName = failureIndex.getIndex().getName(); + deleteIndexOnce(project.id(), indexName, "the lapsed [" + failureRetention + "] retention period"); } } } - } - if (failureIndicesOlderThanRetention.isEmpty() == false) { - assert dataStream.getFailuresLifecycle() != null : "data stream should have failures lifecycle if we have 'old' indices"; - for (Index index : failureIndicesOlderThanRetention) { - if (indicesToExcludeForRemainingRun.contains(index) == false) { - IndexMetadata failureIndex = project.index(index); - assert failureIndex != null : "the data stream failure indices must exist"; - indicesToBeRemoved.add(index); - // there's an opportunity here to batch the delete requests (i.e. delete 100 indices / request) - // let's start simple and reevaluate - String indexName = failureIndex.getIndex().getName(); - deleteIndexOnce(project.id(), indexName, "the lapsed [" + failureRetention + "] retention period"); - } - } + } catch (Exception e) { + // individual index errors would be reported via the API action listener for every delete call + // we could potentially record errors at a data stream level and expose it via the _data_stream API? + logger.error( + () -> String.format( + Locale.ROOT, + "Data stream lifecycle failed to execute retention for data stream [%s]", + dataStream.getName() + ), + e + ); } return indicesToBeRemoved; } @@ -1010,70 +1041,89 @@ Set maybeExecuteRetention( * This method force merges the given indices in the datastream. It writes a timestamp in the cluster state upon completion of the * force merge. */ - private Set maybeExecuteForceMerge(ProjectMetadata project, List indices) { + private Set maybeExecuteForceMerge( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { Set affectedIndices = new HashSet<>(); - for (Index index : indices) { - IndexMetadata backingIndex = project.index(index); - assert backingIndex != null : "the data stream backing indices must exist"; - String indexName = index.getName(); - boolean alreadyForceMerged = isForceMergeComplete(backingIndex); - if (alreadyForceMerged) { - logger.trace("Already force merged {}", indexName); - continue; - } + try { + ProjectMetadata project = projectState.metadata(); + List indices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true); + for (Index index : indices) { + IndexMetadata backingIndex = project.index(index); + assert backingIndex != null : "the data stream backing indices must exist"; + String indexName = index.getName(); + boolean alreadyForceMerged = isForceMergeComplete(backingIndex); + if (alreadyForceMerged) { + logger.trace("Already force merged {}", indexName); + continue; + } - ByteSizeValue configuredFloorSegmentMerge = MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.get( - backingIndex.getSettings() - ); - Integer configuredMergeFactor = MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.get(backingIndex.getSettings()); - if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false) - || (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) { - UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest(); - updateMergePolicySettingsRequest.indices(indexName); - updateMergePolicySettingsRequest.settings( - Settings.builder() - .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), targetMergePolicyFloorSegment) - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), targetMergePolicyFactor) + ByteSizeValue configuredFloorSegmentMerge = MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.get( + backingIndex.getSettings() ); - updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE); - affectedIndices.add(index); - transportActionsDeduplicator.executeOnce( - Tuple.tuple(project.id(), updateMergePolicySettingsRequest), - new ErrorRecordingActionListener( - TransportUpdateSettingsAction.TYPE.name(), - project.id(), - indexName, - errorStore, - Strings.format( - "Data stream lifecycle encountered an error trying to to update settings [%s] for index [%s]", - updateMergePolicySettingsRequest.settings().keySet(), - indexName + Integer configuredMergeFactor = MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.get(backingIndex.getSettings()); + if ((configuredFloorSegmentMerge == null || configuredFloorSegmentMerge.equals(targetMergePolicyFloorSegment) == false) + || (configuredMergeFactor == null || configuredMergeFactor.equals(targetMergePolicyFactor) == false)) { + UpdateSettingsRequest updateMergePolicySettingsRequest = new UpdateSettingsRequest(); + updateMergePolicySettingsRequest.indices(indexName); + updateMergePolicySettingsRequest.settings( + Settings.builder() + .put(MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), targetMergePolicyFloorSegment) + .put(MergePolicyConfig.INDEX_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), targetMergePolicyFactor) + ); + updateMergePolicySettingsRequest.masterNodeTimeout(TimeValue.MAX_VALUE); + affectedIndices.add(index); + transportActionsDeduplicator.executeOnce( + Tuple.tuple(project.id(), updateMergePolicySettingsRequest), + new ErrorRecordingActionListener( + TransportUpdateSettingsAction.TYPE.name(), + project.id(), + indexName, + errorStore, + Strings.format( + "Data stream lifecycle encountered an error trying to to update settings [%s] for index [%s]", + updateMergePolicySettingsRequest.settings().keySet(), + indexName + ), + signallingErrorRetryInterval ), - signallingErrorRetryInterval - ), - (req, reqListener) -> updateIndexSetting(project.id(), updateMergePolicySettingsRequest, reqListener) - ); - } else { - affectedIndices.add(index); - ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); - // time to force merge the index - transportActionsDeduplicator.executeOnce( - Tuple.tuple(project.id(), new ForceMergeRequestWrapper(forceMergeRequest)), - new ErrorRecordingActionListener( - ForceMergeAction.NAME, - project.id(), - indexName, - errorStore, - Strings.format( - "Data stream lifecycle encountered an error trying to force merge index [%s]. Data stream lifecycle will " - + "attempt to force merge the index on its next run.", - indexName + (req, reqListener) -> updateIndexSetting(project.id(), updateMergePolicySettingsRequest, reqListener) + ); + } else { + affectedIndices.add(index); + ForceMergeRequest forceMergeRequest = new ForceMergeRequest(indexName); + // time to force merge the index + transportActionsDeduplicator.executeOnce( + Tuple.tuple(project.id(), new ForceMergeRequestWrapper(forceMergeRequest)), + new ErrorRecordingActionListener( + ForceMergeAction.NAME, + project.id(), + indexName, + errorStore, + Strings.format( + "Data stream lifecycle encountered an error trying to force merge index [%s]. Data stream lifecycle will " + + "attempt to force merge the index on its next run.", + indexName + ), + signallingErrorRetryInterval ), - signallingErrorRetryInterval - ), - (req, reqListener) -> forceMergeIndex(project.id(), forceMergeRequest, reqListener) - ); + (req, reqListener) -> forceMergeIndex(project.id(), forceMergeRequest, reqListener) + ); + } } + } catch (Exception e) { + logger.error( + () -> String.format( + Locale.ROOT, + "Data stream lifecycle failed to execute force merge for data stream [%s]", + dataStream.getName() + ), + e + ); } return affectedIndices; } @@ -1374,9 +1424,15 @@ public void onFailure(Exception e) { * success or failure. */ private void setForceMergeCompletedTimestamp(ProjectId projectId, String targetIndex, ActionListener listener) { - forceMergeClusterStateUpdateTaskQueue.submitTask( + dlmCustomMetadataClusterStateUpdateTaskQueue.submitTask( Strings.format("Adding force merge complete marker to cluster state for [%s]", targetIndex), - new UpdateForceMergeCompleteTask(listener, projectId, targetIndex, threadPool), + new UpdateDataStreamLifecycleCustomMetadataTask( + listener, + projectId, + targetIndex, + FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, + Long.toString(threadPool.absoluteTimeInMillis()) + ), null ); } @@ -1600,24 +1656,32 @@ public void setNowSupplier(LongSupplier nowSupplier) { } /** - * This is a ClusterStateTaskListener that writes the force_merge_completed_timestamp into the cluster state. It is meant to run in - * STATE_UPDATE_TASK_EXECUTOR. + * This is a ClusterStateTaskListener that writes the key and value into the cluster state as custom metadata within targetIndex's + * index metadata. It is meant to run in STATE_UPDATE_TASK_EXECUTOR. */ - static class UpdateForceMergeCompleteTask implements ClusterStateTaskListener { + static class UpdateDataStreamLifecycleCustomMetadataTask implements ClusterStateTaskListener { private final ActionListener listener; private final ProjectId projectId; private final String targetIndex; - private final ThreadPool threadPool; + private final String key; + private final String value; - UpdateForceMergeCompleteTask(ActionListener listener, ProjectId projectId, String targetIndex, ThreadPool threadPool) { + UpdateDataStreamLifecycleCustomMetadataTask( + ActionListener listener, + ProjectId projectId, + String targetIndex, + String key, + String value + ) { this.listener = listener; this.projectId = projectId; this.targetIndex = targetIndex; - this.threadPool = threadPool; + this.key = key; + this.value = value; } ClusterState execute(ClusterState currentState) throws Exception { - logger.debug("Updating cluster state with force merge complete marker for {}", targetIndex); + logger.debug("Updating cluster state with {}} marker for {}", key, targetIndex); final var currentProject = currentState.metadata().getProject(projectId); IndexMetadata indexMetadata = currentProject.index(targetIndex); Map customMetadata = indexMetadata.getCustomData(LIFECYCLE_CUSTOM_INDEX_METADATA_KEY); @@ -1625,7 +1689,7 @@ ClusterState execute(ClusterState currentState) throws Exception { if (customMetadata != null) { newCustomMetadata.putAll(customMetadata); } - newCustomMetadata.put(FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, Long.toString(threadPool.absoluteTimeInMillis())); + newCustomMetadata.put(key, value); IndexMetadata updatededIndexMetadata = new IndexMetadata.Builder(indexMetadata).putCustom( LIFECYCLE_CUSTOM_INDEX_METADATA_KEY, newCustomMetadata @@ -1694,4 +1758,5 @@ public int hashCode() { ); } } + } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java index 35c709462d5d9..d7471854660b1 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.TestShardRoutingRoleStrategies; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; @@ -356,7 +357,8 @@ public void testMergePolicyNotExecutedForTSIndicesWithinTimeBounds() { clusterState = ClusterState.builder(clusterState).putProjectMetadata(builder).build(); dataStreamLifecycleService.run(clusterState); - // There should be two client requests: one rollover, and one to update the merge policy settings. N.B. The merge policy settings + // There should be two client requests: one rollover, and one to update the merge policy settings. One of the non-write indices is + // still within the time bounds to be written to, so it is not force merged. N.B. The merge policy settings // will always be updated before the force merge is done, see testMergePolicySettingsAreConfiguredBeforeForcemerge. assertThat(clientSeenRequests.size(), is(2)); assertThat(clientSeenRequests.get(0), instanceOf(RolloverRequest.class)); @@ -959,12 +961,14 @@ public void onFailure(Exception e) { }; final var projectId = randomProjectIdOrDefault(); String targetIndex = randomAlphaOfLength(20); - DataStreamLifecycleService.UpdateForceMergeCompleteTask task = new DataStreamLifecycleService.UpdateForceMergeCompleteTask( - listener, - projectId, - targetIndex, - threadPool - ); + DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask task = + new DataStreamLifecycleService.UpdateDataStreamLifecycleCustomMetadataTask( + listener, + projectId, + targetIndex, + FORCE_MERGE_COMPLETED_TIMESTAMP_METADATA_KEY, + Long.toString(threadPool.absoluteTimeInMillis()) + ); { Exception exception = new RuntimeException("task failed"); task.onFailure(exception); @@ -1246,7 +1250,7 @@ public void testDownsampling() throws Exception { Set affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( clusterService.state().projectState(projectId), dataStream, - List.of(firstGenIndex) + Set.of() ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); @@ -1275,7 +1279,7 @@ public void testDownsampling() throws Exception { affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( clusterService.state().projectState(projectId), dataStream, - List.of(firstGenIndex) + Set.of() ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); assertThat(clientSeenRequests.size(), is(2)); @@ -1310,7 +1314,7 @@ public void testDownsampling() throws Exception { affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( clusterService.state().projectState(projectId), dataStream, - List.of(firstGenIndex) + Set.of() ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); // still only 2 witnessed requests, nothing extra @@ -1343,7 +1347,7 @@ public void testDownsampling() throws Exception { affectedIndices = dataStreamLifecycleService.maybeExecuteDownsampling( clusterService.state().projectState(projectId), dataStream, - List.of(firstGenIndex) + Set.of() ); assertThat(affectedIndices, is(Set.of(firstGenIndex))); assertBusy(() -> { @@ -1417,7 +1421,7 @@ public void testDownsamplingWhenTargetIndexNameClashYieldsException() throws Exc final var projectState = clusterService.state().projectState(projectId); Index firstGenIndex = projectState.metadata().index(firstGenIndexName).getIndex(); - dataStreamLifecycleService.maybeExecuteDownsampling(projectState, dataStream, List.of(firstGenIndex)); + dataStreamLifecycleService.maybeExecuteDownsampling(projectState, dataStream, Set.of()); assertThat(clientSeenRequests.size(), is(0)); ErrorEntry error = dataStreamLifecycleService.getErrorStore().getError(projectId, firstGenIndexName); @@ -1442,16 +1446,20 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { dataStreamName, List.of(Tuple.tuple(start1, end1), Tuple.tuple(start2, end2), Tuple.tuple(start3, end3)) ); - final var project = clusterState.metadata().getProject(projectId); - DataStream dataStream = project.dataStreams().get(dataStreamName); - + final ProjectState project = clusterState.projectState(projectId); + DataStream dataStream = project.metadata().dataStreams().get(dataStreamName); + dataStream = dataStream.copy() + .setName(dataStreamName) + .setGeneration(dataStream.getGeneration() + 1) + .setLifecycle(DataStreamLifecycle.dataLifecycleBuilder().dataRetention(TimeValue.ZERO).build()) + .build(); { - // test for an index for which `now` is outside its time bounds - Index firstGenIndex = dataStream.getIndices().get(0); + // test for an index for which `now` is outside its time bounds by excluding the two that are not Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( // the end_time for the first generation has lapsed project, - List.of(firstGenIndex), + dataStream, + Set.of(dataStream.getIndices().get(1), dataStream.getIndices().get(2)), currentTime::toEpochMilli ); assertThat(indices.size(), is(0)); @@ -1461,7 +1469,8 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( // the end_time for the first generation has lapsed, but the other 2 generations are still within bounds project, - dataStream.getIndices(), + dataStream, + Set.of(), currentTime::toEpochMilli ); assertThat(indices.size(), is(2)); @@ -1469,16 +1478,33 @@ public void testTimeSeriesIndicesStillWithinTimeBounds() { } { + ProjectMetadata.Builder projectMetadataBuilder = ProjectMetadata.builder(project.metadata()); // non time_series indices are not within time bounds (they don't have any) - IndexMetadata indexMeta = IndexMetadata.builder(randomAlphaOfLengthBetween(10, 30)) - .settings(indexSettings(1, 1).put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersion.current())) - .build(); - - ProjectMetadata newProject = ProjectMetadata.builder(project).put(indexMeta, true).build(); + for (Index index : dataStream.getIndices()) { + IndexMetadata indexMeta = IndexMetadata.builder(index.getName()) + .settings( + indexSettings(1, 1).put(IndexMetadata.SETTING_INDEX_VERSION_CREATED.getKey(), IndexVersion.current()) + .put(IndexSettings.MODE.getKey(), IndexMode.STANDARD) + ) + .build(); + projectMetadataBuilder.put(indexMeta, true); + } + List standardIndices = dataStream.getIndices() + .stream() + .map(index -> new Index(index.getName(), index.getUUID())) + .toList(); + ProjectState newProject = project.updateProject(projectMetadataBuilder.build()); + dataStream = dataStream.copy() + .setName(dataStreamName) + .setGeneration(dataStream.getGeneration() + 1) + .setBackingIndices(DataStream.DataStreamIndices.backingIndicesBuilder(standardIndices).build()) + .setIndexMode(IndexMode.STANDARD) + .build(); Set indices = DataStreamLifecycleService.timeSeriesIndicesStillWithinTimeBounds( newProject, - List.of(indexMeta.getIndex()), + dataStream, + Set.of(), currentTime::toEpochMilli ); assertThat(indices.size(), is(0)); @@ -1593,38 +1619,26 @@ public void testMaybeExecuteRetentionSuccessfulDownsampledIndex() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final var projectId = randomProjectIdOrDefault(); ClusterState state = downsampleSetup(projectId, dataStreamName, SUCCESS); - final var project = state.metadata().getProject(projectId); - DataStream dataStream = project.dataStreams().get(dataStreamName); + final var project = state.projectState(projectId); + DataStream dataStream = project.metadata().dataStreams().get(dataStreamName); String firstGenIndexName = dataStream.getIndices().getFirst().getName(); TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( - project, - dataStream, - dataRetention, - null, - Set.of() - ); - assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex())); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); + assertThat(indicesToBeRemoved, contains(project.metadata().index(firstGenIndexName).getIndex())); } public void testMaybeExecuteRetentionDownsampledIndexInProgress() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final var projectId = randomProjectIdOrDefault(); ClusterState state = downsampleSetup(projectId, dataStreamName, STARTED); - final var project = state.metadata().getProject(projectId); - DataStream dataStream = project.dataStreams().get(dataStreamName); + final var project = state.projectState(projectId); + DataStream dataStream = project.metadata().dataStreams().get(dataStreamName); TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( - project, - dataStream, - dataRetention, - null, - Set.of() - ); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); assertThat(indicesToBeRemoved, empty()); } @@ -1632,20 +1646,14 @@ public void testMaybeExecuteRetentionDownsampledUnknown() { String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); final var projectId = randomProjectIdOrDefault(); ClusterState state = downsampleSetup(projectId, dataStreamName, UNKNOWN); - final var project = state.metadata().getProject(projectId); - DataStream dataStream = project.dataStreams().get(dataStreamName); + final var project = state.projectState(projectId); + DataStream dataStream = project.metadata().dataStreams().get(dataStreamName); String firstGenIndexName = dataStream.getIndices().getFirst().getName(); TimeValue dataRetention = dataStream.getDataLifecycle().dataRetention(); // Executing the method to be tested: - Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention( - project, - dataStream, - dataRetention, - null, - Set.of() - ); - assertThat(indicesToBeRemoved, contains(project.index(firstGenIndexName).getIndex())); + Set indicesToBeRemoved = dataStreamLifecycleService.maybeExecuteRetention(project, dataStream, Set.of()); + assertThat(indicesToBeRemoved, contains(project.metadata().index(firstGenIndexName).getIndex())); } private ClusterState downsampleSetup(ProjectId projectId, String dataStreamName, IndexMetadata.DownsampleTaskStatus status) { diff --git a/x-pack/plugin/data-stream-lifecycle/build.gradle b/x-pack/plugin/data-stream-lifecycle/build.gradle new file mode 100644 index 0000000000000..3c27bc34beb9c --- /dev/null +++ b/x-pack/plugin/data-stream-lifecycle/build.gradle @@ -0,0 +1,23 @@ +apply plugin: 'elasticsearch.internal-es-plugin' +apply plugin: 'elasticsearch.internal-cluster-test' +apply plugin: 'elasticsearch.internal-java-rest-test' + +esplugin { + name = 'x-pack-data-streams' + description = 'Elasticsearch Expanded Pack Plugin - Data Stream Lifecycle' + classname = 'org.elasticsearch.xpack.datastreams.XPackDataStreamsPlugin' + extendedPlugins = ['data-streams', 'x-pack-core'] + hasNativeController =false + requiresKeystore =true +} +base { + archivesName = 'x-pack-data-stream-lifecycle' +} + +dependencies { + compileOnly project(path: xpackModule('core')) + compileOnly project(':modules:data-streams') + testImplementation(testArtifact(project(xpackModule('core')))) +} + +addQaCheckDependencies(project) diff --git a/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/SearchableSnapshotLifecycleActions.java b/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/SearchableSnapshotLifecycleActions.java new file mode 100644 index 0000000000000..d17b5d24c38ef --- /dev/null +++ b/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/SearchableSnapshotLifecycleActions.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +package org.elasticsearch.xpack.datastreams; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectMetadata; +import org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleAction; +import org.elasticsearch.datastreams.lifecycle.DataStreamLifecycleErrorStore; +import org.elasticsearch.index.Index; +import org.elasticsearch.license.License; +import org.elasticsearch.xpack.core.XPackPlugin; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.function.Function; + +public class SearchableSnapshotLifecycleActions implements AdditionalDataStreamLifecycleActions { + + private static final Logger logger = LogManager.getLogger(SearchableSnapshotLifecycleActions.class); + + public List getDataStreamLifecycleActions() { + return List.of(this::cloneIndex, this::forceMergeToSingleSegment, this::snapshot, this::createSearchableSnapshotIndex); + } + + private Set cloneIndex( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + if (XPackPlugin.getSharedLicenseState().isAllowedByLicense(License.OperationMode.ENTERPRISE)) { + ProjectMetadata project = projectState.metadata(); + List indices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true); + for (Index index : indices) { + logger.info("Cloning index {}", index.getName()); + } + return Set.of(); + } else { + return Set.of(); + } + } + + private Set forceMergeToSingleSegment( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + if (XPackPlugin.getSharedLicenseState().isAllowedByLicense(License.OperationMode.ENTERPRISE)) { + ProjectMetadata project = projectState.metadata(); + List indices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true); + for (Index index : indices) { + logger.info("Force merging index {}", index.getName()); + } + return Set.of(); + } else { + return Set.of(); + } + } + + private Set snapshot( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + if (XPackPlugin.getSharedLicenseState().isAllowedByLicense(License.OperationMode.ENTERPRISE)) { + ProjectMetadata project = projectState.metadata(); + List indices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true); + for (Index index : indices) { + logger.info("Snapshotting index {}", index.getName()); + } + return Set.of(); + } else { + return Set.of(); + } + } + + private Set createSearchableSnapshotIndex( + ProjectState projectState, + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Client client, + DataStreamLifecycleErrorStore errorStore + ) { + if (XPackPlugin.getSharedLicenseState().isAllowedByLicense(License.OperationMode.ENTERPRISE)) { + ProjectMetadata project = projectState.metadata(); + List indices = getTargetIndices(dataStream, indicesToExcludeForRemainingRun, project::index, true); + for (Index index : indices) { + logger.info("Creating searchable snapshot index {}", index.getName()); + } + return Set.of(); + } else { + return Set.of(); + } + } + + /** + * Returns the data stream lifecycle managed indices that are not part of the set of indices to exclude. + */ + // For testing + static List getTargetIndices( + DataStream dataStream, + Set indicesToExcludeForRemainingRun, + Function indexMetadataSupplier, + boolean withFailureStore + ) { + List targetIndices = new ArrayList<>(); + for (Index index : dataStream.getIndices()) { + if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier) + && indicesToExcludeForRemainingRun.contains(index) == false) { + targetIndices.add(index); + } + } + if (withFailureStore && dataStream.getFailureIndices().isEmpty() == false) { + for (Index index : dataStream.getFailureIndices()) { + if (dataStream.isIndexManagedByDataStreamLifecycle(index, indexMetadataSupplier) + && indicesToExcludeForRemainingRun.contains(index) == false) { + targetIndices.add(index); + } + } + } + return targetIndices; + } +} diff --git a/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/XPackDataStreamsPlugin.java b/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/XPackDataStreamsPlugin.java new file mode 100644 index 0000000000000..06e41a4b155df --- /dev/null +++ b/x-pack/plugin/data-stream-lifecycle/src/main/java/org/elasticsearch/xpack/datastreams/XPackDataStreamsPlugin.java @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.datastreams; + +import org.elasticsearch.plugins.Plugin; + +public class XPackDataStreamsPlugin extends Plugin { + +} diff --git a/x-pack/plugin/data-stream-lifecycle/src/main/resources/META-INF/services/org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions b/x-pack/plugin/data-stream-lifecycle/src/main/resources/META-INF/services/org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions new file mode 100644 index 0000000000000..b0285e4b243a4 --- /dev/null +++ b/x-pack/plugin/data-stream-lifecycle/src/main/resources/META-INF/services/org.elasticsearch.datastreams.lifecycle.AdditionalDataStreamLifecycleActions @@ -0,0 +1 @@ +org.elasticsearch.xpack.datastreams.SearchableSnapshotLifecycleActions