Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions docs/reference/indices/data-stream-stats.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,16 @@ of the following conditions are met:
* Backing indices with a <<data-streams-generation,lower generation>> contain
higher `@timestamp` values.
=====
====

`write_load_forecast_per_shard`::
(Optional, double)
The forecasted load of a single shard of the data stream's write index.

`write_load_forecast_per_index`::
(Optional, double)
The forecasted load of all shards of the data stream's write index.

====
[[data-stream-stats-api-example]]
==== {api-examples-title}

Expand Down Expand Up @@ -197,14 +205,18 @@ The API returns the following response.
"backing_indices": 3,
"store_size": "3.7kb",
"store_size_bytes": 3772,
"maximum_timestamp": 1607512028000
"maximum_timestamp": 1607512028000,
"write_load_forecast_per_shard": 0.3453,
"write_load_forecast_per_index": 0.6906
},
{
"data_stream": "my-data-stream-two",
"backing_indices": 2,
"store_size": "3.4kb",
"store_size_bytes": 3496,
"maximum_timestamp": 1607425567000
"maximum_timestamp": 1607425567000,
"write_load_forecast_per_shard": 0.1254,
"write_load_forecast_per_index": 0.3762
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine;
Expand All @@ -43,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.SortedMap;

Expand All @@ -52,14 +56,16 @@ public class DataStreamsStatsTransportAction extends TransportBroadcastByNodeAct
DataStreamsStatsAction.DataStreamShardStats> {

private final IndicesService indicesService;
private final WriteLoadForecaster writeLoadForecaster;

@Inject
public DataStreamsStatsTransportAction(
ClusterService clusterService,
TransportService transportService,
IndicesService indicesService,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver
IndexNameExpressionResolver indexNameExpressionResolver,
WriteLoadForecaster writeLoadForecaster
) {
super(
DataStreamsStatsAction.NAME,
Expand All @@ -71,6 +77,7 @@ public DataStreamsStatsTransportAction(
transportService.getThreadPool().executor(ThreadPool.Names.MANAGEMENT)
);
this.indicesService = indicesService;
this.writeLoadForecaster = writeLoadForecaster;
}

@Override
Expand Down Expand Up @@ -167,6 +174,16 @@ protected DataStreamsStatsAction.DataStreamShardStats readShardResult(StreamInpu
if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM) {
DataStream dataStream = (DataStream) indexAbstraction;
AggregatedStats stats = aggregatedDataStreamsStats.computeIfAbsent(dataStream.getName(), s -> new AggregatedStats());
Index writeIndex = indexAbstraction.getWriteIndex();
if (writeIndex != null) {
IndexMetadata writeIndexMetadata = clusterState.getMetadata().index(writeIndex);
OptionalDouble writeLoadForecast = writeLoadForecaster.getForecastedWriteLoad(writeIndexMetadata);
if (writeLoadForecast.isPresent()) {
stats.writeLoadForecastPerShard = writeLoadForecast.getAsDouble();
stats.writeLoadForecastPerIndex = stats.writeLoadForecastPerShard * writeIndexMetadata.getNumberOfShards();
}
}

dataStream.getIndices().stream().map(Index::getName).forEach(index -> {
stats.backingIndices.add(index);
allBackingIndices.add(index);
Expand Down Expand Up @@ -227,7 +244,9 @@ public DataStreamsStatsAction.Response newResponse(
entry.getKey(),
entry.getValue().backingIndices.size(),
ByteSizeValue.ofBytes(entry.getValue().storageBytes),
entry.getValue().maxTimestamp
entry.getValue().maxTimestamp,
entry.getValue().writeLoadForecastPerShard,
entry.getValue().writeLoadForecastPerIndex
)
)
.toArray(DataStreamsStatsAction.DataStreamStats[]::new);
Expand All @@ -249,5 +268,9 @@ private static class AggregatedStats {
Set<String> backingIndices = new HashSet<>();
long storageBytes = 0L;
long maxTimestamp = 0L;
@Nullable
Double writeLoadForecastPerIndex = null;
@Nullable
Double writeLoadForecastPerShard = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.cluster.routing.allocation.WriteLoadForecaster;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.junit.After;

Expand All @@ -34,17 +41,20 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static java.lang.Math.max;
import static org.elasticsearch.datastreams.DataStreamsStatsTests.StaticWriteLoadForecasterPlugin.WRITE_LOAD_FORECAST_PER_SHARD;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;

public class DataStreamsStatsTests extends ESSingleNodeTestCase {

@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return List.of(DataStreamsPlugin.class);
return List.of(DataStreamsPlugin.class, StaticWriteLoadForecasterPlugin.class);
}

private final Set<String> createdDataStreams = new HashSet<>();
Expand Down Expand Up @@ -86,6 +96,8 @@ public void testStatsEmptyDataStream() throws Exception {
assertEquals(0L, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

public void testStatsExistingDataStream() throws Exception {
Expand All @@ -104,6 +116,8 @@ public void testStatsExistingDataStream() throws Exception {
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

public void testStatsExistingHiddenDataStream() throws Exception {
Expand All @@ -122,6 +136,8 @@ public void testStatsExistingHiddenDataStream() throws Exception {
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

public void testStatsClosedBackingIndexDataStream() throws Exception {
Expand All @@ -146,6 +162,8 @@ public void testStatsClosedBackingIndexDataStream() throws Exception {
assertEquals(0L, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));

// Call stats again after writing a new event into the write index
long timestamp = createDocument(dataStreamName);
Expand All @@ -162,6 +180,8 @@ public void testStatsClosedBackingIndexDataStream() throws Exception {
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

public void testStatsRolledDataStream() throws Exception {
Expand All @@ -182,6 +202,8 @@ public void testStatsRolledDataStream() throws Exception {
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

public void testStatsMultipleDataStreams() throws Exception {
Expand Down Expand Up @@ -213,16 +235,47 @@ public void testStatsMultipleDataStreams() throws Exception {
assertEquals(1, dataStreamStats.getBackingIndices());
assertEquals(expectedMaxTS.longValue(), dataStreamStats.getMaximumTimestamp());
assertNotEquals(0L, dataStreamStats.getStoreSize().getBytes());
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(null, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}
}

public void testWriteLoadStatsExistingDataStream() throws Exception {
int shards = 3;
String dataStreamName = createDataStream("write-load-test-", 3);
long timestamp = createDocument(dataStreamName);

DataStreamsStatsAction.Response stats = getDataStreamsStats();
assertEquals(shards, stats.getSuccessfulShards());
assertEquals(0, stats.getFailedShards());
assertEquals(1, stats.getDataStreamCount());
assertEquals(1, stats.getBackingIndices());
assertNotEquals(0L, stats.getTotalStoreSize().getBytes());
assertEquals(1, stats.getDataStreams().length);
assertEquals(dataStreamName, stats.getDataStreams()[0].getDataStream());
assertEquals(1, stats.getDataStreams()[0].getBackingIndices());
assertEquals(timestamp, stats.getDataStreams()[0].getMaximumTimestamp());
assertNotEquals(0L, stats.getDataStreams()[0].getStoreSize().getBytes());
assertEquals(stats.getTotalStoreSize().getBytes(), stats.getDataStreams()[0].getStoreSize().getBytes());
assertThat(WRITE_LOAD_FORECAST_PER_SHARD, is(stats.getDataStreams()[0].getWriteLoadForecastPerShard()));
assertThat(WRITE_LOAD_FORECAST_PER_SHARD * shards, is(stats.getDataStreams()[0].getWriteLoadForecastPerIndex()));
}

private String createDataStream() throws Exception {
return createDataStream(false);
return createDataStream(false, "", 1);
}

private String createDataStream(String namePrefix, int shards) throws Exception {
return createDataStream(false, namePrefix, shards);
}
private String createDataStream(boolean hidden) throws Exception {
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
Template idxTemplate = new Template(null, new CompressedXContent("""
return createDataStream(hidden, "", 1);
}

private String createDataStream(boolean hidden, String namePrefix, int shards) throws Exception {
String dataStreamName = namePrefix + randomAlphaOfLength(10).toLowerCase(Locale.getDefault());
Template idxTemplate = new Template(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, shards).build(),
new CompressedXContent("""
{"properties":{"@timestamp":{"type":"date"},"data":{"type":"keyword"}}}
"""), null);
ComposableIndexTemplate template = ComposableIndexTemplate.builder()
Expand Down Expand Up @@ -285,4 +338,35 @@ private void deleteDataStream(String dataStreamName) {
)
);
}

/**
* Plugin providing {@link WriteLoadForecaster} implementation for Test purposes.
* If the given index's name contains "write-load-test" the forecaster returns 0.25, else {@link OptionalDouble#empty()}
*/
public static class StaticWriteLoadForecasterPlugin extends Plugin implements ClusterPlugin {

public static final double WRITE_LOAD_FORECAST_PER_SHARD = 0.25;

@Override
public Collection<WriteLoadForecaster> createWriteLoadForecasters(
ThreadPool threadPool,
Settings settings,
ClusterSettings clusterSettings
) {
return List.of(new WriteLoadForecaster() {
@Override
public Metadata.Builder withWriteLoadForecastForWriteIndex(String dataStreamName, Metadata.Builder metadata) {
return metadata;
}

@Override
public OptionalDouble getForecastedWriteLoad(IndexMetadata indexMetadata) {
if (indexMetadata.getIndex().getName().contains("write-load-test")) {
return OptionalDouble.of(WRITE_LOAD_FORECAST_PER_SHARD);
}
return OptionalDouble.empty();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ public static DataStreamsStatsAction.Response randomStatsResponse() {
long storeSize = randomLongBetween(250, 1000000000);
totalStoreSize += storeSize;
long maximumTimestamp = randomRecentTimestamp();
Double writeLoadForecastPerShard = randomDouble();
Double writeLoadForecastPerIndex = writeLoadForecastPerShard * randomInt();
dataStreamStats.add(
new DataStreamsStatsAction.DataStreamStats(
dataStreamName,
backingIndices,
ByteSizeValue.ofBytes(storeSize),
maximumTimestamp
maximumTimestamp,
writeLoadForecastPerShard,
writeLoadForecastPerIndex
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ setup:
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 2 }
- match: { data_streams.0.maximum_timestamp: 0 }
- match: { data_streams.0.write_load_forecast_per_shard: null }
- match: { data_streams.0.write_load_forecast_per_index: null }

- do:
indices.delete_data_stream:
Expand Down Expand Up @@ -91,6 +93,8 @@ setup:
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 1 }
- match: { data_streams.0.maximum_timestamp: 1593639273740 }
- match: { data_streams.0.write_load_forecast_per_shard: null }
- match: { data_streams.0.write_load_forecast_per_index: null }

- do:
indices.delete_data_stream:
Expand Down Expand Up @@ -136,6 +140,8 @@ setup:
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 2 }
- match: { data_streams.0.maximum_timestamp: 1593639345064 }
- match: { data_streams.0.write_load_forecast_per_shard: null }
- match: { data_streams.0.write_load_forecast_per_index: null }

- do:
indices.delete_data_stream:
Expand Down Expand Up @@ -193,9 +199,13 @@ setup:
- match: { data_streams.0.data_stream: 'simple-data-stream1' }
- match: { data_streams.0.backing_indices: 2 }
- match: { data_streams.0.maximum_timestamp: 1593639468350 }
- match: { data_streams.0.write_load_forecast_per_shard: null }
- match: { data_streams.0.write_load_forecast_per_index: null }
- match: { data_streams.1.data_stream: 'simple-data-stream2' }
- match: { data_streams.1.backing_indices: 1 }
- match: { data_streams.1.maximum_timestamp: 1593639450943 }
- match: { data_streams.1.write_load_forecast_per_shard: null }
- match: { data_streams.1.write_load_forecast_per_index: null }

- do:
indices.delete_data_stream:
Expand Down
Loading