diff --git a/docs/reference/indices/data-stream-stats.asciidoc b/docs/reference/indices/data-stream-stats.asciidoc index 3ed285abc035a..c89a2a99a0059 100644 --- a/docs/reference/indices/data-stream-stats.asciidoc +++ b/docs/reference/indices/data-stream-stats.asciidoc @@ -167,8 +167,16 @@ of the following conditions are met: * Backing indices with a <> contain higher `@timestamp` values. ===== -==== +`write_load_forecast_per_shard`:: +(Optional, double) +The forecasted load of a single shard of the data stream's write index. + +`write_load_forecast_per_index`:: +(Optional, double) +The forecasted load of all shards of the data stream's write index. + +==== [[data-stream-stats-api-example]] ==== {api-examples-title} @@ -197,14 +205,18 @@ The API returns the following response. "backing_indices": 3, "store_size": "3.7kb", "store_size_bytes": 3772, - "maximum_timestamp": 1607512028000 + "maximum_timestamp": 1607512028000, + "write_load_forecast_per_shard": 0.3453, + "write_load_forecast_per_index": 0.6906 }, { "data_stream": "my-data-stream-two", "backing_indices": 2, "store_size": "3.4kb", "store_size_bytes": 3496, - "maximum_timestamp": 1607425567000 + "maximum_timestamp": 1607425567000, + "write_load_forecast_per_shard": 0.1254, + "write_load_forecast_per_index": 0.3762 } ] } diff --git a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java index 1b18f8b799f4d..8fa03f874a526 100644 --- a/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java +++ b/modules/data-streams/src/main/java/org/elasticsearch/datastreams/action/DataStreamsStatsTransportAction.java @@ -21,13 +21,16 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexAbstraction; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; @@ -43,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.OptionalDouble; import java.util.Set; import java.util.SortedMap; @@ -52,6 +56,7 @@ public class DataStreamsStatsTransportAction extends TransportBroadcastByNodeAct DataStreamsStatsAction.DataStreamShardStats> { private final IndicesService indicesService; + private final WriteLoadForecaster writeLoadForecaster; @Inject public DataStreamsStatsTransportAction( @@ -59,7 +64,8 @@ public DataStreamsStatsTransportAction( TransportService transportService, IndicesService indicesService, ActionFilters actionFilters, - IndexNameExpressionResolver indexNameExpressionResolver + IndexNameExpressionResolver indexNameExpressionResolver, + WriteLoadForecaster writeLoadForecaster ) { super( DataStreamsStatsAction.NAME, @@ -71,6 +77,7 @@ public DataStreamsStatsTransportAction( transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT) ); this.indicesService = indicesService; + this.writeLoadForecaster = writeLoadForecaster; } @Override @@ -167,6 +174,16 @@ protected DataStreamsStatsAction.DataStreamShardStats readShardResult(StreamInpu if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) { DataStream dataStream = (DataStream) indexAbstraction; AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats()); + Index writeIndex = indexAbstraction.getWriteIndex(); + if (writeIndex != null) { + IndexMetadata writeIndexMetadata = clusterState.getMetadata().index(writeIndex); + OptionalDouble writeLoadForecast = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata); + if (writeLoadForecast.isPresent()) { + stats.writeLoadForecastPerShard = writeLoadForecast.getAsDouble(); + stats.writeLoadForecastPerIndex = stats.writeLoadForecastPerShard * writeIndexMetadata.getNumberOfShards(); + } + } + dataStream.getIndices().stream().map(Index::getName).forEach(index -> { stats.backingIndices.add(index); allBackingIndices.add(index); @@ -227,7 +244,9 @@ public DataStreamsStatsAction.Response newResponse( entry.getKey(), entry.getValue().backingIndices.size(), ByteSizeValue.ofBytes(entry.getValue().storageBytes), - entry.getValue().maxTimestamp + entry.getValue().maxTimestamp, + entry.getValue().writeLoadForecastPerShard, + entry.getValue().writeLoadForecastPerIndex ) ) .toArray(DataStreamsStatsAction.DataStreamStats[]::new); @@ -249,5 +268,9 @@ private static class AggregatedStats { Set backingIndices = new HashSet<>(); long storageBytes = 0L; long maxTimestamp = 0L; + @Nullable + Double writeLoadForecastPerIndex = null; + @Nullable + Double writeLoadForecastPerShard = null; } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java index 2204c82670e69..60a07fb7a1ab4 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/DataStreamsStatsTests.java @@ -21,10 +21,17 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Template; +import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster; import org.elasticsearch.common.compress.CompressedXContent; +import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.ClusterPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.json.JsonXContent; import org.junit.After; @@ -34,17 +41,20 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.OptionalDouble; import java.util.Set; import java.util.concurrent.TimeUnit; import static java.lang.Math.max; +import static org.elasticsearch.datastreams.DataStreamsStatsTests.StaticWriteLoadForecasterPlugin.WRITE_LOAD_FORECAST_PER_SHARD; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; public class DataStreamsStatsTests extends ESSingleNodeTestCase { @Override protected Collection> getPlugins() { - return List.of(DataStreamsPlugin.class); + return List.of(DataStreamsPlugin.class, StaticWriteLoadForecasterPlugin.class); } private final Set createdDataStreams = new HashSet<>(); @@ -86,6 +96,8 @@ public void testStatsEmptyDataStream() throws Exception { assertEquals(0L, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } public void testStatsExistingDataStream() throws Exception { @@ -104,6 +116,8 @@ public void testStatsExistingDataStream() throws Exception { assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } public void testStatsExistingHiddenDataStream() throws Exception { @@ -122,6 +136,8 @@ public void testStatsExistingHiddenDataStream() throws Exception { assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } public void testStatsClosedBackingIndexDataStream() throws Exception { @@ -146,6 +162,8 @@ public void testStatsClosedBackingIndexDataStream() throws Exception { assertEquals(0L, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); // Call stats again after writing a new event into the write index long timestamp = createDocument(dataStreamName); @@ -162,6 +180,8 @@ public void testStatsClosedBackingIndexDataStream() throws Exception { assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } public void testStatsRolledDataStream() throws Exception { @@ -182,6 +202,8 @@ public void testStatsRolledDataStream() throws Exception { assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp()); assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } public void testStatsMultipleDataStreams() throws Exception { @@ -213,16 +235,47 @@ public void testStatsMultipleDataStreams() throws Exception { assertEquals(1, dataStreamStats.getBackingIndices()); assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp()); assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes()); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); } } + public void testWriteLoadStatsExistingDataStream() throws Exception { + int shards = 3; + String dataStreamName = createDataStream("write-load-test-", 3); + long timestamp = createDocument(dataStreamName); + + DataStreamsStatsAction.Response stats = getDataStreamsStats(); + assertEquals(shards, stats.getSuccessfulShards()); + assertEquals(0, stats.getFailedShards()); + assertEquals(1, stats.getDataStreamCount()); + assertEquals(1, stats.getBackingIndices()); + assertNotEquals(0L, stats.getTotalStoreSize().getBytes()); + assertEquals(1, stats.getDataStreams().length); + assertEquals(dataStreamName, stats.getDataStreams()[0].getDataStream()); + assertEquals(1, stats.getDataStreams()[0].getBackingIndices()); + assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp()); + assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes()); + assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes()); + assertThat(WRITE_LOAD_FORECAST_PER_SHARD, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard())); + assertThat(WRITE_LOAD_FORECAST_PER_SHARD * shards, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex())); + } + private String createDataStream() throws Exception { - return createDataStream(false); + return createDataStream(false, "", 1); } + private String createDataStream(String namePrefix, int shards) throws Exception { + return createDataStream(false, namePrefix, shards); + } private String createDataStream(boolean hidden) throws Exception { - String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault()); - Template idxTemplate = new Template(null, new CompressedXContent(""" + return createDataStream(hidden, "", 1); + } + + private String createDataStream(boolean hidden, String namePrefix, int shards) throws Exception { + String dataStreamName = namePrefix + randomAlphaOfLength(10).toLowerCase(Locale.getDefault()); + Template idxTemplate = new Template(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shards).build(), + new CompressedXContent(""" {"properties":{"@timestamp":{"type":"date"},"data":{"type":"keyword"}}} """), null); ComposableIndexTemplate template = ComposableIndexTemplate.builder() @@ -285,4 +338,35 @@ private void deleteDataStream(String dataStreamName) { ) ); } + + /** + * Plugin providing {@link WriteLoadForecaster} implementation for Test purposes. + * If the given index's name contains "write-load-test" the forecaster returns 0.25, else {@link OptionalDouble#empty()} + */ + public static class StaticWriteLoadForecasterPlugin extends Plugin implements ClusterPlugin { + + public static final double WRITE_LOAD_FORECAST_PER_SHARD = 0.25; + + @Override + public Collection createWriteLoadForecasters( + ThreadPool threadPool, + Settings settings, + ClusterSettings clusterSettings + ) { + return List.of(new WriteLoadForecaster() { + @Override + public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) { + return metadata; + } + + @Override + public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) { + if (indexMetadata.getIndex().getName().contains("write-load-test")) { + return OptionalDouble.of(WRITE_LOAD_FORECAST_PER_SHARD); + } + return OptionalDouble.empty(); + } + }); + } + } } diff --git a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/DataStreamsStatsResponseTests.java b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/DataStreamsStatsResponseTests.java index 9874c19a78240..c45b0dc460ea1 100644 --- a/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/DataStreamsStatsResponseTests.java +++ b/modules/data-streams/src/test/java/org/elasticsearch/datastreams/action/DataStreamsStatsResponseTests.java @@ -47,12 +47,16 @@ public static DataStreamsStatsAction.Response randomStatsResponse() { long storeSize = randomLongBetween(250, 1000000000); totalStoreSize += storeSize; long maximumTimestamp = randomRecentTimestamp(); + Double writeLoadForecastPerShard = randomDouble(); + Double writeLoadForecastPerIndex = writeLoadForecastPerShard * randomInt(); dataStreamStats.add( new DataStreamsStatsAction.DataStreamStats( dataStreamName, backingIndices, ByteSizeValue.ofBytes(storeSize), - maximumTimestamp + maximumTimestamp, + writeLoadForecastPerShard, + writeLoadForecastPerIndex ) ); } diff --git a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/120_data_streams_stats.yml b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/120_data_streams_stats.yml index 7ee9e656007ed..40cad6987de6e 100644 --- a/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/120_data_streams_stats.yml +++ b/modules/data-streams/src/yamlRestTest/resources/rest-api-spec/test/data_stream/120_data_streams_stats.yml @@ -59,6 +59,8 @@ setup: - match: { data_streams.0.data_stream: 'simple-data-stream1' } - match: { data_streams.0.backing_indices: 2 } - match: { data_streams.0.maximum_timestamp: 0 } + - match: { data_streams.0.write_load_forecast_per_shard: null } + - match: { data_streams.0.write_load_forecast_per_index: null } - do: indices.delete_data_stream: @@ -91,6 +93,8 @@ setup: - match: { data_streams.0.data_stream: 'simple-data-stream1' } - match: { data_streams.0.backing_indices: 1 } - match: { data_streams.0.maximum_timestamp: 1593639273740 } + - match: { data_streams.0.write_load_forecast_per_shard: null } + - match: { data_streams.0.write_load_forecast_per_index: null } - do: indices.delete_data_stream: @@ -136,6 +140,8 @@ setup: - match: { data_streams.0.data_stream: 'simple-data-stream1' } - match: { data_streams.0.backing_indices: 2 } - match: { data_streams.0.maximum_timestamp: 1593639345064 } + - match: { data_streams.0.write_load_forecast_per_shard: null } + - match: { data_streams.0.write_load_forecast_per_index: null } - do: indices.delete_data_stream: @@ -193,9 +199,13 @@ setup: - match: { data_streams.0.data_stream: 'simple-data-stream1' } - match: { data_streams.0.backing_indices: 2 } - match: { data_streams.0.maximum_timestamp: 1593639468350 } + - match: { data_streams.0.write_load_forecast_per_shard: null } + - match: { data_streams.0.write_load_forecast_per_index: null } - match: { data_streams.1.data_stream: 'simple-data-stream2' } - match: { data_streams.1.backing_indices: 1 } - match: { data_streams.1.maximum_timestamp: 1593639450943 } + - match: { data_streams.1.write_load_forecast_per_shard: null } + - match: { data_streams.1.write_load_forecast_per_index: null } - do: indices.delete_data_stream: diff --git a/server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsStatsAction.java b/server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsStatsAction.java index 9a4eaf9a78e9b..df6c2efa2a7e9 100644 --- a/server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/datastreams/DataStreamsStatsAction.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.Nullable; import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.xcontent.ToXContentObject; import org.elasticsearch.xcontent.XContentBuilder; @@ -153,12 +154,25 @@ public static class DataStreamStats implements ToXContentObject, Writeable { private final int backingIndices; private final ByteSizeValue storeSize; private final long maximumTimestamp; + @Nullable + private final Double writeLoadForecastPerShard; + @Nullable + private final Double writeLoadForecastPerIndex; - public DataStreamStats(String dataStream, int backingIndices, ByteSizeValue storeSize, long maximumTimestamp) { + public DataStreamStats( + String dataStream, + int backingIndices, + ByteSizeValue storeSize, + long maximumTimestamp, + @Nullable Double writeLoadForecastPerShard, + @Nullable Double writeLoadForecastPerIndex + ) { this.dataStream = dataStream; this.backingIndices = backingIndices; this.storeSize = storeSize; this.maximumTimestamp = maximumTimestamp; + this.writeLoadForecastPerShard = writeLoadForecastPerShard; + this.writeLoadForecastPerIndex = writeLoadForecastPerIndex; } public DataStreamStats(StreamInput in) throws IOException { @@ -166,6 +180,8 @@ public DataStreamStats(StreamInput in) throws IOException { this.backingIndices = in.readVInt(); this.storeSize = ByteSizeValue.readFrom(in); this.maximumTimestamp = in.readVLong(); + this.writeLoadForecastPerShard = in.readOptionalDouble(); + this.writeLoadForecastPerIndex = in.readOptionalDouble(); } @Override @@ -174,6 +190,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(backingIndices); storeSize.writeTo(out); out.writeVLong(maximumTimestamp); + out.writeOptionalDouble(writeLoadForecastPerShard); + out.writeOptionalDouble(writeLoadForecastPerIndex); } @Override @@ -183,6 +201,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field("backing_indices", backingIndices); builder.humanReadableField("store_size_bytes", "store_size", storeSize); builder.field("maximum_timestamp", maximumTimestamp); + builder.field("write_load_forecast_per_shard", writeLoadForecastPerShard); + builder.field("write_load_forecast_per_index", writeLoadForecastPerIndex); builder.endObject(); return builder; } @@ -203,6 +223,14 @@ public long getMaximumTimestamp() { return maximumTimestamp; } + public Double getWriteLoadForecastPerShard() { + return writeLoadForecastPerShard; + } + + public Double getWriteLoadForecastPerIndex() { + return writeLoadForecastPerIndex; + } + @Override public boolean equals(Object obj) { if (this == obj) { @@ -214,13 +242,22 @@ public boolean equals(Object obj) { DataStreamStats that = (DataStreamStats) obj; return backingIndices == that.backingIndices && maximumTimestamp == that.maximumTimestamp + && Objects.equals(writeLoadForecastPerShard, that.writeLoadForecastPerShard) + && Objects.equals(writeLoadForecastPerIndex, that.writeLoadForecastPerIndex) && Objects.equals(dataStream, that.dataStream) && Objects.equals(storeSize, that.storeSize); } @Override public int hashCode() { - return Objects.hash(dataStream, backingIndices, storeSize, maximumTimestamp); + return Objects.hash( + dataStream, + backingIndices, + storeSize, + maximumTimestamp, + writeLoadForecastPerShard, + writeLoadForecastPerIndex + ); } @Override @@ -235,6 +272,10 @@ public String toString() { + storeSize + ", maximumTimestamp=" + maximumTimestamp + + ", writeLoadForecastPerShard=" + + writeLoadForecastPerShard + + ", writeLoadForecastPerIndex=" + + writeLoadForecastPerIndex + '}'; } }