Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
Expand All @@ -68,6 +69,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -280,17 +282,13 @@ protected void masterOperation(
DataStream dataStream = (DataStream) indexAbstraction;
final Optional<IndexStats> indexStats = Optional.ofNullable(statsResponse)
.map(stats -> stats.getIndex(dataStream.getWriteIndex().getName()));

Double indexWriteLoad = indexStats.map(
stats -> Arrays.stream(stats.getShards())
.filter(shardStats -> shardStats.getStats().indexing != null)
// only take primaries into account as in stateful the replicas also index data
.filter(shardStats -> shardStats.getShardRouting().primary())
.map(shardStats -> shardStats.getStats().indexing.getTotal().getWriteLoad())
.reduce(0.0, Double::sum)
).orElse(null);

rolloverAutoSharding = dataStreamAutoShardingService.calculate(projectState, dataStream, indexWriteLoad);
rolloverAutoSharding = dataStreamAutoShardingService.calculate(
projectState,
dataStream,
indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getWriteLoad)).orElse(null),
indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getRecentWriteLoad)).orElse(null),
indexStats.map(stats -> sumLoadMetrics(stats, IndexingStats.Stats::getPeakWriteLoad)).orElse(null)
);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking out loud: what if we moved the write load calculations in the dataStreamAutoShardingService.calculate(...) and just pass the indexStats?

I think it fits the responsibility of the DataStreamAutoShardingService.java better and it can potentially allow us to do further improvements, if we deem that some write loads are not relevant.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good suggestion. I've pushed a commit to do this, see what you think.

I agree that it's better separation of responsibilities. It makes the tests a bit more complicated, because of all the stuff we have to construct to extract and sum those three values from. However it also increases test coverage, I think, because we didn't previously test the extraction and summation logic AFAICS (the tests for the rollover action never asserted that it was making the correct call to the auto-sharding service) and now we do. So the additional complication is in a good cause!

logger.debug("auto sharding result for data stream [{}] is [{}]", dataStream.getName(), rolloverAutoSharding);

// if auto sharding recommends increasing the number of shards we want to trigger a rollover even if there are no
Expand Down Expand Up @@ -354,6 +352,16 @@ protected void masterOperation(
);
}

private static Double sumLoadMetrics(IndexStats stats, Function<IndexingStats.Stats, Double> loadMetric) {
return Arrays.stream(stats.getShards())
.filter(shardStats -> shardStats.getStats().indexing != null)
// only take primaries into account as in stateful the replicas also index data
.filter(shardStats -> shardStats.getShardRouting().primary())
.map(shardStats -> shardStats.getStats().indexing.getTotal())
.map(loadMetric)
.reduce(0.0, Double::sum);
}

private void markForLazyRollover(
RolloverRequest rolloverRequest,
ActionListener<RolloverResponse> listener,
Expand Down
Loading