Skip to content

Commit 66432fb

Browse files
ES-10037 Track the peak indexing load for each shard (#125521)
This tracks the highest value seen for the recent write load metric any time the stats for a shard was computed, exposes this value alongside the recent value, and persists it in index metadata alongside it too. The new test in `IndexShardTests` is designed to more thoroughly test the recent write load metric previously added, as well as to test the peak metric being added here. ES-10037 #comment Added peak load metric in #125521
1 parent 8ced682 commit 66432fb

File tree

21 files changed

+317
-90
lines changed

21 files changed

+317
-90
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
530+
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
531531
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532532
}
533533

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ static TransportVersion def(int id) {
206206
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
207207
public static final TransportVersion RESCORE_VECTOR_ALLOW_ZERO = def(9_039_0_00);
208208
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
209+
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
209210

210211
/*
211212
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
109109
shardStats.getShardRouting().id(),
110110
indexingShardStats.getWriteLoad(),
111111
indexingShardStats.getRecentWriteLoad(),
112+
indexingShardStats.getPeakWriteLoad(),
112113
indexingShardStats.getTotalActiveTimeInMillis()
113114
);
114115
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java

Lines changed: 67 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12-
import org.elasticsearch.TransportVersions;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.common.io.stream.Writeable;
@@ -26,31 +25,42 @@
2625
import java.util.OptionalDouble;
2726
import java.util.OptionalLong;
2827

28+
import static org.elasticsearch.TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
30+
2931
public class IndexWriteLoad implements Writeable, ToXContentFragment {
3032
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
3133
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
3234
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
35+
public static final ParseField SHARDS_PEAK_WRITE_LOAD_FIELD = new ParseField("peak_loads");
3336
private static final Double UNKNOWN_LOAD = -1.0;
3437
private static final long UNKNOWN_UPTIME = -1;
3538

3639
@SuppressWarnings("unchecked")
3740
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
3841
"index_write_load_parser",
3942
false,
40-
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
43+
(args, unused) -> IndexWriteLoad.create(
44+
(List<Double>) args[0],
45+
(List<Long>) args[1],
46+
(List<Double>) args[2],
47+
(List<Double>) args[3]
48+
)
4149
);
4250

4351
static {
4452
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
4553
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
46-
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
54+
// The recent and peak write load fields are optional so that we can parse XContent built by older versions which did not have them:
4755
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
56+
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_PEAK_WRITE_LOAD_FIELD);
4857
}
4958

5059
private static IndexWriteLoad create(
5160
List<Double> shardsWriteLoad,
5261
List<Long> shardsUptimeInMillis,
53-
@Nullable List<Double> shardsRecentWriteLoad
62+
@Nullable List<Double> shardsRecentWriteLoad,
63+
@Nullable List<Double> shardsPeakWriteLoad
5464
) {
5565
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
5666
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
@@ -72,8 +82,19 @@ private static IndexWriteLoad create(
7282
if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) {
7383
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
7484
throw new IllegalArgumentException(
75-
"The same number of shard write loads and shard uptimes should be provided, but "
76-
+ shardsWriteLoad
85+
"The same number of shard recent write loads and shard uptimes should be provided, but "
86+
+ shardsRecentWriteLoad
87+
+ " "
88+
+ shardsUptimeInMillis
89+
+ " were provided"
90+
);
91+
}
92+
93+
if (shardsPeakWriteLoad != null && shardsPeakWriteLoad.size() != shardsUptimeInMillis.size()) {
94+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsPeakWriteLoad and shardUptimeInMillis";
95+
throw new IllegalArgumentException(
96+
"The same number of shard peak write loads and shard uptimes should be provided, but "
97+
+ shardsPeakWriteLoad
7798
+ " "
7899
+ shardsUptimeInMillis
79100
+ " were provided"
@@ -83,15 +104,22 @@ private static IndexWriteLoad create(
83104
return new IndexWriteLoad(
84105
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
85106
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
86-
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
107+
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null,
108+
shardsPeakWriteLoad != null ? shardsPeakWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
87109
);
88110
}
89111

90112
private final double[] shardWriteLoad;
91113
private final long[] shardUptimeInMillis;
92114
private final double[] shardRecentWriteLoad;
115+
private final double[] shardPeakWriteLoad;
93116

94-
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
117+
private IndexWriteLoad(
118+
double[] shardWriteLoad,
119+
long[] shardUptimeInMillis,
120+
@Nullable double[] shardRecentWriteLoad,
121+
@Nullable double[] shardPeakWriteLoad
122+
) {
95123
assert shardWriteLoad.length == shardUptimeInMillis.length
96124
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
97125
this.shardWriteLoad = shardWriteLoad;
@@ -104,30 +132,43 @@ private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nul
104132
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
105133
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
106134
}
135+
if (shardPeakWriteLoad != null) {
136+
assert shardPeakWriteLoad.length == shardUptimeInMillis.length
137+
: "IndexWriteLoad constructor was called with non-matched lengths for shardPeakWriteLoad and shardUptimeInMillis";
138+
this.shardPeakWriteLoad = shardPeakWriteLoad;
139+
} else {
140+
this.shardPeakWriteLoad = new double[shardUptimeInMillis.length];
141+
Arrays.fill(this.shardPeakWriteLoad, UNKNOWN_LOAD);
142+
}
107143
}
108144

109145
public IndexWriteLoad(StreamInput in) throws IOException {
110146
this(
111147
in.readDoubleArray(),
112148
in.readLongArray(),
113-
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
149+
in.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null,
150+
in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD) ? in.readDoubleArray() : null
114151
);
115152
}
116153

117154
@Override
118155
public void writeTo(StreamOutput out) throws IOException {
119156
out.writeDoubleArray(shardWriteLoad);
120157
out.writeLongArray(shardUptimeInMillis);
121-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
158+
if (out.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
122159
out.writeDoubleArray(shardRecentWriteLoad);
123160
}
161+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
162+
out.writeDoubleArray(shardPeakWriteLoad);
163+
}
124164
}
125165

126166
@Override
127167
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
128168
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
129169
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
130170
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
171+
builder.field(SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName(), shardPeakWriteLoad);
131172
return builder;
132173
}
133174

@@ -149,6 +190,13 @@ public OptionalDouble getRecentWriteLoadForShard(int shardId) {
149190
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
150191
}
151192

193+
public OptionalDouble getPeakWriteLoadForShard(int shardId) {
194+
assertShardInBounds(shardId);
195+
196+
double load = shardPeakWriteLoad[shardId];
197+
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
198+
}
199+
152200
public OptionalLong getUptimeInMillisForShard(int shardId) {
153201
assertShardInBounds(shardId);
154202

@@ -172,14 +220,16 @@ public boolean equals(Object o) {
172220
IndexWriteLoad that = (IndexWriteLoad) o;
173221
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
174222
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
175-
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
223+
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad)
224+
&& Arrays.equals(shardPeakWriteLoad, that.shardPeakWriteLoad);
176225
}
177226

178227
@Override
179228
public int hashCode() {
180229
int result = Arrays.hashCode(shardWriteLoad);
181230
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
182231
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
232+
result = 31 * result + Arrays.hashCode(shardPeakWriteLoad);
183233
return result;
184234
}
185235

@@ -192,30 +242,34 @@ public static class Builder {
192242
private final double[] shardWriteLoad;
193243
private final long[] uptimeInMillis;
194244
private final double[] shardRecentWriteLoad;
245+
private final double[] shardPeakWriteLoad;
195246

196247
private Builder(int numShards) {
197248
this.shardWriteLoad = new double[numShards];
198249
this.uptimeInMillis = new long[numShards];
199250
this.shardRecentWriteLoad = new double[numShards];
251+
this.shardPeakWriteLoad = new double[numShards];
200252
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
201253
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
202254
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
255+
Arrays.fill(shardPeakWriteLoad, UNKNOWN_LOAD);
203256
}
204257

205-
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
258+
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, double peakLoad, long uptimeInMillis) {
206259
if (shardId >= this.shardWriteLoad.length) {
207260
throw new IllegalArgumentException();
208261
}
209262

210263
this.shardWriteLoad[shardId] = load;
211264
this.uptimeInMillis[shardId] = uptimeInMillis;
212265
this.shardRecentWriteLoad[shardId] = recentLoad;
266+
this.shardPeakWriteLoad[shardId] = peakLoad;
213267

214268
return this;
215269
}
216270

217271
public IndexWriteLoad build() {
218-
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
272+
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad, shardPeakWriteLoad);
219273
}
220274
}
221275
}

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
2930

3031
public class IndexingStats implements Writeable, ToXContentFragment {
3132

@@ -46,6 +47,7 @@ public static class Stats implements Writeable, ToXContentFragment {
4647
private long totalIndexingTimeSinceShardStartedInNanos;
4748
private long totalActiveTimeInNanos;
4849
private double recentIndexingLoad;
50+
private double peakIndexingLoad;
4951

5052
Stats() {}
5153

@@ -76,6 +78,15 @@ public Stats(StreamInput in) throws IOException {
7678
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
7779
: 0;
7880
}
81+
if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
82+
peakIndexingLoad = in.readDouble();
83+
} else {
84+
// When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the
85+
// unweighted write load, rather that assuming zero load:
86+
peakIndexingLoad = totalActiveTimeInNanos > 0
87+
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
88+
: 0;
89+
}
7990
}
8091

8192
public Stats(
@@ -92,7 +103,8 @@ public Stats(
92103
long throttleTimeInMillis,
93104
long totalIndexingTimeSinceShardStartedInNanos,
94105
long totalActiveTimeInNanos,
95-
double recentIndexingLoad
106+
double recentIndexingLoad,
107+
double peakIndexingLoad
96108
) {
97109
this.indexCount = indexCount;
98110
this.indexTimeInMillis = indexTimeInMillis;
@@ -110,6 +122,7 @@ public Stats(
110122
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
111123
// We store the weighted write load as a double because the calculation is inherently floating point
112124
this.recentIndexingLoad = recentIndexingLoad;
125+
this.peakIndexingLoad = peakIndexingLoad;
113126
}
114127

115128
public void add(Stats stats) {
@@ -131,11 +144,12 @@ public void add(Stats stats) {
131144
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
132145
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
133146
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
134-
// We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating
135-
// formula for a weighted mean:
147+
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
148+
// so we use the updating formula for a weighted mean:
136149
if (totalActiveTimeInNanos > 0) {
137150
recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos
138151
/ totalActiveTimeInNanos;
152+
peakIndexingLoad += (stats.peakIndexingLoad - peakIndexingLoad) * stats.totalActiveTimeInNanos / totalActiveTimeInNanos;
139153
}
140154
}
141155

@@ -239,6 +253,20 @@ public double getRecentWriteLoad() {
239253
return recentIndexingLoad;
240254
}
241255

256+
/**
257+
* Returns a measurement of the peak write load.
258+
*
259+
* <p>If this {@link Stats} instance represents a single shard, this is the highest value that {@link #getRecentWriteLoad()} would
260+
* return for any of the instances created for this shard since it started (i.e. the highest value seen by any call to
261+
* {@link InternalIndexingStats#stats}).
262+
*
263+
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that value for each shard, weighted by
264+
* the elapsed time for each shard. (N.B. This is the average of the peak values, <i>not</i> the peak of the average value.)
265+
*/
266+
public double getPeakWriteLoad() {
267+
return peakIndexingLoad;
268+
}
269+
242270
public long getTotalActiveTimeInMillis() {
243271
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
244272
}
@@ -265,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException {
265293
if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
266294
out.writeDouble(recentIndexingLoad);
267295
}
296+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
297+
out.writeDouble(peakIndexingLoad);
298+
}
268299
}
269300

270301
@Override
@@ -286,6 +317,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
286317

287318
builder.field(Fields.WRITE_LOAD, getWriteLoad());
288319
builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad());
320+
builder.field(Fields.PEAK_WRITE_LOAD, getPeakWriteLoad());
289321
return builder;
290322
}
291323

@@ -307,7 +339,8 @@ public boolean equals(Object o) {
307339
&& throttleTimeInMillis == that.throttleTimeInMillis
308340
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
309341
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
310-
&& recentIndexingLoad == that.recentIndexingLoad;
342+
&& recentIndexingLoad == that.recentIndexingLoad
343+
&& peakIndexingLoad == that.peakIndexingLoad;
311344
}
312345

313346
@Override
@@ -408,6 +441,7 @@ static final class Fields {
408441
static final String THROTTLED_TIME = "throttle_time";
409442
static final String WRITE_LOAD = "write_load";
410443
static final String RECENT_WRITE_LOAD = "recent_write_load";
444+
static final String PEAK_WRITE_LOAD = "peak_write_load";
411445
}
412446

413447
@Override

0 commit comments

Comments
 (0)