Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ static TransportVersion def(int id) {
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,17 @@ private AutoShardingResult getDecreaseShardsResult(

// Visible for testing
static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) {
/*
* Define:
* - shardsByMaxThreads = number of shards required to ensure no more than 50% utilization with max number of threads per shard
* - shardsByMinThreads = number of shards required to ensure no more than 50% utilization with min number of threads per shard
* Note that shardsByMaxThreads <= shardsByMinThreads.
* This returns:
* - shardsByMaxThreads if shardsByMaxThreads > 3
* - 3 if shardsByMaxThreads <= 3 and shardsByMinThreads > 3
* - shardsByMinThreads if 0 < shardsByMinThreads <= 3
* - 1 if shardsByMinThreads == 0
*/
return Math.max(
Math.max(
Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
indexWriteLoadBuilder.withShardWriteLoad(
shardStats.getShardRouting().id(),
indexingShardStats.getWriteLoad(),
indexingShardStats.getRecentWriteLoad(),
indexingShardStats.getTotalActiveTimeInMillis()
);
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.TransportVersions;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand All @@ -27,24 +29,31 @@
public class IndexWriteLoad implements Writeable, ToXContentFragment {
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
private static final Double UNKNOWN_LOAD = -1.0;
private static final long UNKNOWN_UPTIME = -1;

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
"index_write_load_parser",
false,
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1])
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
);

static {
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
}

public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> shardsUptimeInMillis) {
private static IndexWriteLoad create(
List<Double> shardsWriteLoad,
List<Long> shardsUptimeInMillis,
@Nullable List<Double> shardsRecentWriteLoad
) {
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
assert false;
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
throw new IllegalArgumentException(
"The same number of shard write loads and shard uptimes should be provided, but "
+ shardsWriteLoad
Expand All @@ -55,39 +64,70 @@ public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> sha
}

if (shardsWriteLoad.isEmpty()) {
assert false;
assert false : "IndexWriteLoad.create() was called with empty shardsRecentWriteLoad";
;
throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided");
}

if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) {
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
throw new IllegalArgumentException(
"The same number of shard write loads and shard uptimes should be provided, but "
+ shardsWriteLoad
+ " "
+ shardsUptimeInMillis
+ " were provided"
);
}

return new IndexWriteLoad(
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray()
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
);
}

private final double[] shardWriteLoad;
private final long[] shardUptimeInMillis;
private final double[] shardRecentWriteLoad;

private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) {
assert shardWriteLoad.length == shardUptimeInMillis.length;
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
assert shardWriteLoad.length == shardUptimeInMillis.length
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
this.shardWriteLoad = shardWriteLoad;
this.shardUptimeInMillis = shardUptimeInMillis;
if (shardRecentWriteLoad != null) {
assert shardRecentWriteLoad.length == shardUptimeInMillis.length
: "IndexWriteLoad constructor was called with non-matched lengths for shardRecentWriteLoad and shardUptimeInMillis";
this.shardRecentWriteLoad = shardRecentWriteLoad;
} else {
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
}
}

public IndexWriteLoad(StreamInput in) throws IOException {
this(in.readDoubleArray(), in.readLongArray());
this(
in.readDoubleArray(),
in.readLongArray(),
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeDoubleArray(shardWriteLoad);
out.writeLongArray(shardUptimeInMillis);
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
out.writeDoubleArray(shardRecentWriteLoad);
}
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
return builder;
}

Expand All @@ -102,14 +142,20 @@ public OptionalDouble getWriteLoadForShard(int shardId) {
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
}

public OptionalDouble getRecentWriteLoadForShard(int shardId) {
assertShardInBounds(shardId);

double load = shardRecentWriteLoad[shardId];
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
}

public OptionalLong getUptimeInMillisForShard(int shardId) {
assertShardInBounds(shardId);

long uptime = shardUptimeInMillis[shardId];
return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty();
}

// Visible for testing
public int numberOfShards() {
return shardWriteLoad.length;
}
Expand All @@ -124,13 +170,16 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
IndexWriteLoad that = (IndexWriteLoad) o;
return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis);
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
}

@Override
public int hashCode() {
int result = Arrays.hashCode(shardWriteLoad);
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
return result;
}

Expand All @@ -140,29 +189,33 @@ public static Builder builder(int numShards) {
}

public static class Builder {
final double[] shardWriteLoad;
final long[] uptimeInMillis;
private final double[] shardWriteLoad;
private final long[] uptimeInMillis;
private final double[] shardRecentWriteLoad;

private Builder(int numShards) {
this.shardWriteLoad = new double[numShards];
this.uptimeInMillis = new long[numShards];
this.shardRecentWriteLoad = new double[numShards];
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
}

public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) {
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
if (shardId >= this.shardWriteLoad.length) {
throw new IllegalArgumentException();
}

this.shardWriteLoad[shardId] = load;
this.uptimeInMillis[shardId] = uptimeInMillis;
this.shardRecentWriteLoad[shardId] = recentLoad;

return this;
}

public IndexWriteLoad build() {
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis);
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final RefreshFieldHasValueListener refreshFieldHasValueListener;
private volatile boolean useRetentionLeasesInPeerRecovery;
private final LongSupplier relativeTimeInNanosSupplier;
private volatile long startedRelativeTimeInNanos;
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
private volatile long indexingTimeBeforeShardStartedInNanos;
private volatile double recentIndexingLoadAtShardStarted;
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
Expand Down Expand Up @@ -557,7 +557,10 @@ public void updateShardState(
: "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;

changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
startedRelativeTimeInNanos = getRelativeTimeInNanos();
long relativeTimeInNanos = getRelativeTimeInNanos();
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value. So in the vanishingly
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
} else if (currentRouting.primary()
Expand Down Expand Up @@ -1370,11 +1373,14 @@ public IndexingStats indexingStats() {
}

long currentTimeInNanos = getRelativeTimeInNanos();
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L;
return internalIndexingStats.stats(
throttled,
throttleTimeInMillis,
indexingTimeBeforeShardStartedInNanos,
currentTimeInNanos - startedRelativeTimeInNanos,
timeSinceShardStartedInNanos,
currentTimeInNanos,
recentIndexingLoadAtShardStarted
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
* Container for cluster settings related to {@link IndexingStats}.
*/
public class IndexingStatsSettings {

// TODO: Change this default to something sensible:
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = new TimeValue(10000, TimeUnit.DAYS);
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = TimeValue.timeValueMinutes(5); // Aligns with the interval between DSL runs
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MIN = TimeValue.timeValueSeconds(1); // A sub-second half-life makes no sense
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MAX = TimeValue.timeValueDays(100_000); // Long.MAX_VALUE nanos, rounded down

/**
* A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the
Expand All @@ -34,7 +34,8 @@ public class IndexingStatsSettings {
public static final Setting<TimeValue> RECENT_WRITE_LOAD_HALF_LIFE_SETTING = Setting.timeSetting(
"indices.stats.recent_write_load.half_life",
RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT,
TimeValue.ZERO,
RECENT_WRITE_LOAD_HALF_LIFE_MIN,
RECENT_WRITE_LOAD_HALF_LIFE_MAX,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down
Loading