Skip to content

Commit 6331e61

Browse files
PeteGillinElasticomricohenn
authored andcommitted
ES-10037 Persist recent write load in index metadata (elastic#125330)
This changes the default value for the Exponentially Weighted Moving Rate calculation used for the 'recent write load' metric in indexing stats to 5 minutes (as agreed over Slack) and persists the value in the index metadata alongside the existing write load metric. The value is still not used in the data stream autosharding calculation, that will be yet one more PR. There are a couple of drive-by changes in this PR: It adds a comment to DataStreamAutoShardingService.computeOptimalNumberOfShards, because the nested min and max calls are quite hard to understand at a glance. It changes IndexShard.indexingStats() so that, if it is called before the shard has entered the started state, it uses a timeSinceShardStartedInNanos value of zero when calling InternalIndexingStats.stats(). Previously, it would have passed the current relative time in nanos as timeSinceShardStartedInNanos (because startedRelativeTimeInNanos would be zero) which is arbitrary and incorrect (since the zero point of System.nanoTime() is arbitrary). This didn't actually matter, since InternalIndexingStats.postIndex would not increment the metrics while in recovery, so the numerator used to calculate the write load would be zero if the shard has not started, so it doesn't matter if the denominator is incorrect. However, it is good defensive coding not to rely on that, and to pass a correct value instead.
1 parent 4a3ce8e commit 6331e61

File tree

13 files changed

+245
-80
lines changed

13 files changed

+245
-80
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ static TransportVersion def(int id) {
195195
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
196196
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
197197
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
198+
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
198199

199200
/*
200201
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,17 @@ private AutoShardingResult getDecreaseShardsResult(
322322

323323
// Visible for testing
324324
static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) {
325+
/*
326+
* Define:
327+
* - shardsByMaxThreads = number of shards required to ensure no more than 50% utilization with max number of threads per shard
328+
* - shardsByMinThreads = number of shards required to ensure no more than 50% utilization with min number of threads per shard
329+
* Note that shardsByMaxThreads <= shardsByMinThreads.
330+
* This returns:
331+
* - shardsByMaxThreads if shardsByMaxThreads > 3
332+
* - 3 if shardsByMaxThreads <= 3 and shardsByMinThreads > 3
333+
* - shardsByMinThreads if 0 < shardsByMinThreads <= 3
334+
* - 1 if shardsByMinThreads == 0
335+
*/
325336
return Math.max(
326337
Math.max(
327338
Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3),

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
108108
indexWriteLoadBuilder.withShardWriteLoad(
109109
shardStats.getShardRouting().id(),
110110
indexingShardStats.getWriteLoad(),
111+
indexingShardStats.getRecentWriteLoad(),
111112
indexingShardStats.getTotalActiveTimeInMillis()
112113
);
113114
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.core.Nullable;
1517
import org.elasticsearch.xcontent.ConstructingObjectParser;
1618
import org.elasticsearch.xcontent.ParseField;
1719
import org.elasticsearch.xcontent.ToXContentFragment;
@@ -27,24 +29,31 @@
2729
public class IndexWriteLoad implements Writeable, ToXContentFragment {
2830
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
2931
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
32+
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
3033
private static final Double UNKNOWN_LOAD = -1.0;
3134
private static final long UNKNOWN_UPTIME = -1;
3235

3336
@SuppressWarnings("unchecked")
3437
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
3538
"index_write_load_parser",
3639
false,
37-
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1])
40+
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
3841
);
3942

4043
static {
4144
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
4245
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
46+
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
47+
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
4348
}
4449

45-
public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> shardsUptimeInMillis) {
50+
private static IndexWriteLoad create(
51+
List<Double> shardsWriteLoad,
52+
List<Long> shardsUptimeInMillis,
53+
@Nullable List<Double> shardsRecentWriteLoad
54+
) {
4655
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
47-
assert false;
56+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
4857
throw new IllegalArgumentException(
4958
"The same number of shard write loads and shard uptimes should be provided, but "
5059
+ shardsWriteLoad
@@ -55,39 +64,70 @@ public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> sha
5564
}
5665

5766
if (shardsWriteLoad.isEmpty()) {
58-
assert false;
67+
assert false : "IndexWriteLoad.create() was called with empty shardsRecentWriteLoad";
68+
;
5969
throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided");
6070
}
6171

72+
if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) {
73+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
74+
throw new IllegalArgumentException(
75+
"The same number of shard write loads and shard uptimes should be provided, but "
76+
+ shardsWriteLoad
77+
+ " "
78+
+ shardsUptimeInMillis
79+
+ " were provided"
80+
);
81+
}
82+
6283
return new IndexWriteLoad(
6384
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
64-
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray()
85+
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
86+
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
6587
);
6688
}
6789

6890
private final double[] shardWriteLoad;
6991
private final long[] shardUptimeInMillis;
92+
private final double[] shardRecentWriteLoad;
7093

71-
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) {
72-
assert shardWriteLoad.length == shardUptimeInMillis.length;
94+
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
95+
assert shardWriteLoad.length == shardUptimeInMillis.length
96+
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
7397
this.shardWriteLoad = shardWriteLoad;
7498
this.shardUptimeInMillis = shardUptimeInMillis;
99+
if (shardRecentWriteLoad != null) {
100+
assert shardRecentWriteLoad.length == shardUptimeInMillis.length
101+
: "IndexWriteLoad constructor was called with non-matched lengths for shardRecentWriteLoad and shardUptimeInMillis";
102+
this.shardRecentWriteLoad = shardRecentWriteLoad;
103+
} else {
104+
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
105+
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
106+
}
75107
}
76108

77109
public IndexWriteLoad(StreamInput in) throws IOException {
78-
this(in.readDoubleArray(), in.readLongArray());
110+
this(
111+
in.readDoubleArray(),
112+
in.readLongArray(),
113+
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
114+
);
79115
}
80116

81117
@Override
82118
public void writeTo(StreamOutput out) throws IOException {
83119
out.writeDoubleArray(shardWriteLoad);
84120
out.writeLongArray(shardUptimeInMillis);
121+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
122+
out.writeDoubleArray(shardRecentWriteLoad);
123+
}
85124
}
86125

87126
@Override
88127
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
89128
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
90129
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
130+
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
91131
return builder;
92132
}
93133

@@ -102,14 +142,20 @@ public OptionalDouble getWriteLoadForShard(int shardId) {
102142
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
103143
}
104144

145+
public OptionalDouble getRecentWriteLoadForShard(int shardId) {
146+
assertShardInBounds(shardId);
147+
148+
double load = shardRecentWriteLoad[shardId];
149+
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
150+
}
151+
105152
public OptionalLong getUptimeInMillisForShard(int shardId) {
106153
assertShardInBounds(shardId);
107154

108155
long uptime = shardUptimeInMillis[shardId];
109156
return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty();
110157
}
111158

112-
// Visible for testing
113159
public int numberOfShards() {
114160
return shardWriteLoad.length;
115161
}
@@ -124,13 +170,16 @@ public boolean equals(Object o) {
124170
if (this == o) return true;
125171
if (o == null || getClass() != o.getClass()) return false;
126172
IndexWriteLoad that = (IndexWriteLoad) o;
127-
return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis);
173+
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
174+
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
175+
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
128176
}
129177

130178
@Override
131179
public int hashCode() {
132180
int result = Arrays.hashCode(shardWriteLoad);
133181
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
182+
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
134183
return result;
135184
}
136185

@@ -140,29 +189,33 @@ public static Builder builder(int numShards) {
140189
}
141190

142191
public static class Builder {
143-
final double[] shardWriteLoad;
144-
final long[] uptimeInMillis;
192+
private final double[] shardWriteLoad;
193+
private final long[] uptimeInMillis;
194+
private final double[] shardRecentWriteLoad;
145195

146196
private Builder(int numShards) {
147197
this.shardWriteLoad = new double[numShards];
148198
this.uptimeInMillis = new long[numShards];
199+
this.shardRecentWriteLoad = new double[numShards];
149200
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
150201
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
202+
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
151203
}
152204

153-
public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) {
205+
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
154206
if (shardId >= this.shardWriteLoad.length) {
155207
throw new IllegalArgumentException();
156208
}
157209

158210
this.shardWriteLoad[shardId] = load;
159211
this.uptimeInMillis[shardId] = uptimeInMillis;
212+
this.shardRecentWriteLoad[shardId] = recentLoad;
160213

161214
return this;
162215
}
163216

164217
public IndexWriteLoad build() {
165-
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis);
218+
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
166219
}
167220
}
168221
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
298298
private final RefreshFieldHasValueListener refreshFieldHasValueListener;
299299
private volatile boolean useRetentionLeasesInPeerRecovery;
300300
private final LongSupplier relativeTimeInNanosSupplier;
301-
private volatile long startedRelativeTimeInNanos;
301+
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
302302
private volatile long indexingTimeBeforeShardStartedInNanos;
303303
private volatile double recentIndexingLoadAtShardStarted;
304304
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
@@ -557,7 +557,10 @@ public void updateShardState(
557557
: "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
558558

559559
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
560-
startedRelativeTimeInNanos = getRelativeTimeInNanos();
560+
long relativeTimeInNanos = getRelativeTimeInNanos();
561+
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value. So in the vanishingly
562+
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
563+
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
561564
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
562565
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
563566
} else if (currentRouting.primary()
@@ -1370,11 +1373,14 @@ public IndexingStats indexingStats() {
13701373
}
13711374

13721375
long currentTimeInNanos = getRelativeTimeInNanos();
1376+
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
1377+
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
1378+
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L;
13731379
return internalIndexingStats.stats(
13741380
throttled,
13751381
throttleTimeInMillis,
13761382
indexingTimeBeforeShardStartedInNanos,
1377-
currentTimeInNanos - startedRelativeTimeInNanos,
1383+
timeSinceShardStartedInNanos,
13781384
currentTimeInNanos,
13791385
recentIndexingLoadAtShardStarted
13801386
);

server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.core.TimeValue;
1616

17-
import java.util.concurrent.TimeUnit;
1817
import java.util.concurrent.atomic.AtomicReference;
1918

2019
/**
2120
* Container for cluster settings related to {@link IndexingStats}.
2221
*/
2322
public class IndexingStatsSettings {
2423

25-
// TODO: Change this default to something sensible:
26-
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = new TimeValue(10000, TimeUnit.DAYS);
24+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = TimeValue.timeValueMinutes(5); // Aligns with the interval between DSL runs
25+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MIN = TimeValue.timeValueSeconds(1); // A sub-second half-life makes no sense
26+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MAX = TimeValue.timeValueDays(100_000); // Long.MAX_VALUE nanos, rounded down
2727

2828
/**
2929
* A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the
@@ -34,7 +34,8 @@ public class IndexingStatsSettings {
3434
public static final Setting<TimeValue> RECENT_WRITE_LOAD_HALF_LIFE_SETTING = Setting.timeSetting(
3535
"indices.stats.recent_write_load.half_life",
3636
RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT,
37-
TimeValue.ZERO,
37+
RECENT_WRITE_LOAD_HALF_LIFE_MIN,
38+
RECENT_WRITE_LOAD_HALF_LIFE_MAX,
3839
Setting.Property.Dynamic,
3940
Setting.Property.NodeScope
4041
);

0 commit comments

Comments
 (0)