Skip to content

Commit bf752ef

Browse files
Track the peak indexing load for each shard
This tracks the highest value seen for the recent write load metric any time the stats for a shard was computed, exposes this value alongside the recent value, and persists it in index metadata alongside it too. The new test in `IndexShardTests` is designed to more thoroughly test the recent write load metric previously added, as well as to test the peak metric being added here. ES-10037
1 parent 04d0a0a commit bf752ef

File tree

20 files changed

+308
-88
lines changed

20 files changed

+308
-88
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/DataStreamAutoshardingIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ private static ShardStats getShardStats(IndexMetadata indexMeta, int shardIndex,
527527
CommonStats stats = new CommonStats();
528528
stats.docs = new DocsStats(100, 0, randomByteSizeValue().getBytes());
529529
stats.store = new StoreStats();
530-
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123));
530+
stats.indexing = new IndexingStats(new IndexingStats.Stats(1, 1, 1, 1, 1, 1, 1, 1, 1, false, 1, targetWriteLoad, 1, 0.123, 0.234));
531531
return new ShardStats(shardRouting, new ShardPath(false, path, path, shardId), stats, null, null, null, false, 0);
532532
}
533533

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ static TransportVersion def(int id) {
199199
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
200200
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
201201
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
202+
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_037_0_00);
202203

203204
/*
204205
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
109109
shardStats.getShardRouting().id(),
110110
indexingShardStats.getWriteLoad(),
111111
indexingShardStats.getRecentWriteLoad(),
112+
indexingShardStats.getPeakWriteLoad(),
112113
indexingShardStats.getTotalActiveTimeInMillis()
113114
);
114115
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java

Lines changed: 66 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12-
import org.elasticsearch.TransportVersions;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.common.io.stream.Writeable;
@@ -26,31 +25,42 @@
2625
import java.util.OptionalDouble;
2726
import java.util.OptionalLong;
2827

28+
import static org.elasticsearch.TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
30+
2931
public class IndexWriteLoad implements Writeable, ToXContentFragment {
3032
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
3133
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
3234
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
35+
public static final ParseField SHARDS_PEAK_WRITE_LOAD_FIELD = new ParseField("peak_loads");
3336
private static final Double UNKNOWN_LOAD = -1.0;
3437
private static final long UNKNOWN_UPTIME = -1;
3538

3639
@SuppressWarnings("unchecked")
3740
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
3841
"index_write_load_parser",
3942
false,
40-
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
43+
(args, unused) -> IndexWriteLoad.create(
44+
(List<Double>) args[0],
45+
(List<Long>) args[1],
46+
(List<Double>) args[2],
47+
(List<Double>) args[3]
48+
)
4149
);
4250

4351
static {
4452
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
4553
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
46-
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
54+
// The recent and peak write load fields are optional so that we can parse XContent built by older versions which did not have them:
4755
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
56+
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_PEAK_WRITE_LOAD_FIELD);
4857
}
4958

5059
private static IndexWriteLoad create(
5160
List<Double> shardsWriteLoad,
5261
List<Long> shardsUptimeInMillis,
53-
@Nullable List<Double> shardsRecentWriteLoad
62+
@Nullable List<Double> shardsRecentWriteLoad,
63+
@Nullable List<Double> shardsPeakWriteLoad
5464
) {
5565
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
5666
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
@@ -73,7 +83,18 @@ private static IndexWriteLoad create(
7383
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
7484
throw new IllegalArgumentException(
7585
"The same number of shard write loads and shard uptimes should be provided, but "
76-
+ shardsWriteLoad
86+
+ shardsRecentWriteLoad
87+
+ " "
88+
+ shardsUptimeInMillis
89+
+ " were provided"
90+
);
91+
}
92+
93+
if (shardsPeakWriteLoad != null && shardsPeakWriteLoad.size() != shardsUptimeInMillis.size()) {
94+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsPeakWriteLoad and shardUptimeInMillis";
95+
throw new IllegalArgumentException(
96+
"The same number of shard write loads and shard uptimes should be provided, but "
97+
+ shardsPeakWriteLoad
7798
+ " "
7899
+ shardsUptimeInMillis
79100
+ " were provided"
@@ -83,15 +104,22 @@ private static IndexWriteLoad create(
83104
return new IndexWriteLoad(
84105
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
85106
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
86-
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
107+
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null,
108+
shardsPeakWriteLoad != null ? shardsPeakWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
87109
);
88110
}
89111

90112
private final double[] shardWriteLoad;
91113
private final long[] shardUptimeInMillis;
92114
private final double[] shardRecentWriteLoad;
115+
private final double[] shardPeakWriteLoad;
93116

94-
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
117+
private IndexWriteLoad(
118+
double[] shardWriteLoad,
119+
long[] shardUptimeInMillis,
120+
@Nullable double[] shardRecentWriteLoad,
121+
@Nullable double[] shardPeakWriteLoad
122+
) {
95123
assert shardWriteLoad.length == shardUptimeInMillis.length
96124
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
97125
this.shardWriteLoad = shardWriteLoad;
@@ -104,30 +132,43 @@ private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nul
104132
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
105133
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
106134
}
135+
if (shardPeakWriteLoad != null) {
136+
assert shardPeakWriteLoad.length == shardUptimeInMillis.length
137+
: "IndexWriteLoad constructor was called with non-matched lengths for shardPeakWriteLoad and shardUptimeInMillis";
138+
this.shardPeakWriteLoad = shardPeakWriteLoad;
139+
} else {
140+
this.shardPeakWriteLoad = new double[shardUptimeInMillis.length];
141+
Arrays.fill(this.shardPeakWriteLoad, UNKNOWN_LOAD);
142+
}
107143
}
108144

109145
public IndexWriteLoad(StreamInput in) throws IOException {
110146
this(
111147
in.readDoubleArray(),
112148
in.readLongArray(),
113-
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
149+
in.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null,
150+
in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD) ? in.readDoubleArray() : null
114151
);
115152
}
116153

117154
@Override
118155
public void writeTo(StreamOutput out) throws IOException {
119156
out.writeDoubleArray(shardWriteLoad);
120157
out.writeLongArray(shardUptimeInMillis);
121-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
158+
if (out.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
122159
out.writeDoubleArray(shardRecentWriteLoad);
123160
}
161+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
162+
out.writeDoubleArray(shardPeakWriteLoad);
163+
}
124164
}
125165

126166
@Override
127167
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
128168
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
129169
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
130170
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
171+
builder.field(SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName(), shardPeakWriteLoad);
131172
return builder;
132173
}
133174

@@ -149,6 +190,13 @@ public OptionalDouble getRecentWriteLoadForShard(int shardId) {
149190
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
150191
}
151192

193+
public OptionalDouble getPeakWriteLoadForShard(int shardId) {
194+
assertShardInBounds(shardId);
195+
196+
double load = shardPeakWriteLoad[shardId];
197+
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
198+
}
199+
152200
public OptionalLong getUptimeInMillisForShard(int shardId) {
153201
assertShardInBounds(shardId);
154202

@@ -172,14 +220,16 @@ public boolean equals(Object o) {
172220
IndexWriteLoad that = (IndexWriteLoad) o;
173221
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
174222
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
175-
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
223+
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad)
224+
&& Arrays.equals(shardPeakWriteLoad, that.shardPeakWriteLoad);
176225
}
177226

178227
@Override
179228
public int hashCode() {
180229
int result = Arrays.hashCode(shardWriteLoad);
181230
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
182231
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
232+
result = 31 * result + Arrays.hashCode(shardPeakWriteLoad);
183233
return result;
184234
}
185235

@@ -192,30 +242,34 @@ public static class Builder {
192242
private final double[] shardWriteLoad;
193243
private final long[] uptimeInMillis;
194244
private final double[] shardRecentWriteLoad;
245+
private final double[] shardPeakWriteLoad;
195246

196247
private Builder(int numShards) {
197248
this.shardWriteLoad = new double[numShards];
198249
this.uptimeInMillis = new long[numShards];
199250
this.shardRecentWriteLoad = new double[numShards];
251+
this.shardPeakWriteLoad = new double[numShards];
200252
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
201253
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
202254
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
255+
Arrays.fill(shardPeakWriteLoad, UNKNOWN_LOAD);
203256
}
204257

205-
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
258+
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, double peakLoad, long uptimeInMillis) {
206259
if (shardId >= this.shardWriteLoad.length) {
207260
throw new IllegalArgumentException();
208261
}
209262

210263
this.shardWriteLoad[shardId] = load;
211264
this.uptimeInMillis[shardId] = uptimeInMillis;
212265
this.shardRecentWriteLoad[shardId] = recentLoad;
266+
this.shardPeakWriteLoad[shardId] = peakLoad;
213267

214268
return this;
215269
}
216270

217271
public IndexWriteLoad build() {
218-
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
272+
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad, shardPeakWriteLoad);
219273
}
220274
}
221275
}

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
2930

3031
public class IndexingStats implements Writeable, ToXContentFragment {
3132

@@ -46,6 +47,7 @@ public static class Stats implements Writeable, ToXContentFragment {
4647
private long totalIndexingTimeSinceShardStartedInNanos;
4748
private long totalActiveTimeInNanos;
4849
private double recentIndexingLoad;
50+
private double peakIndexingLoad;
4951

5052
Stats() {}
5153

@@ -76,6 +78,15 @@ public Stats(StreamInput in) throws IOException {
7678
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
7779
: 0;
7880
}
81+
if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
82+
peakIndexingLoad = in.readDouble();
83+
} else {
84+
// When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the
85+
// unweighted write load, rather that assuming zero load:
86+
peakIndexingLoad = totalActiveTimeInNanos > 0
87+
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
88+
: 0;
89+
}
7990
}
8091

8192
public Stats(
@@ -92,7 +103,8 @@ public Stats(
92103
long throttleTimeInMillis,
93104
long totalIndexingTimeSinceShardStartedInNanos,
94105
long totalActiveTimeInNanos,
95-
double recentIndexingLoad
106+
double recentIndexingLoad,
107+
double peakIndexingLoad
96108
) {
97109
this.indexCount = indexCount;
98110
this.indexTimeInMillis = indexTimeInMillis;
@@ -110,6 +122,7 @@ public Stats(
110122
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
111123
// We store the weighted write load as a double because the calculation is inherently floating point
112124
this.recentIndexingLoad = recentIndexingLoad;
125+
this.peakIndexingLoad = peakIndexingLoad;
113126
}
114127

115128
public void add(Stats stats) {
@@ -131,11 +144,12 @@ public void add(Stats stats) {
131144
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
132145
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
133146
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
134-
// We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating
135-
// formula for a weighted mean:
147+
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
148+
// so we use the updating formula for a weighted mean:
136149
if (totalActiveTimeInNanos > 0) {
137150
recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos
138151
/ totalActiveTimeInNanos;
152+
peakIndexingLoad += (stats.peakIndexingLoad - peakIndexingLoad) * stats.totalActiveTimeInNanos / totalActiveTimeInNanos;
139153
}
140154
}
141155

@@ -239,6 +253,20 @@ public double getRecentWriteLoad() {
239253
return recentIndexingLoad;
240254
}
241255

256+
/**
257+
* Returns a measurement of the peak write load.
258+
*
259+
* <p>If this {@link Stats} instance represents a single shard, this is the highest value that {@link #getRecentWriteLoad()} would
260+
* return for any of the instances created for this shard since it started (i.e. the highest value seen by any call to
261+
* {@link InternalIndexingStats#stats}).
262+
*
263+
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that value for each shard, weighted by
264+
* the elapsed time for each shard. (N.B. This is the average of the peak values, <i>not</i> the peak of the average value.)
265+
*/
266+
public double getPeakWriteLoad() {
267+
return peakIndexingLoad;
268+
}
269+
242270
public long getTotalActiveTimeInMillis() {
243271
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
244272
}
@@ -265,6 +293,9 @@ public void writeTo(StreamOutput out) throws IOException {
265293
if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
266294
out.writeDouble(recentIndexingLoad);
267295
}
296+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
297+
out.writeDouble(peakIndexingLoad);
298+
}
268299
}
269300

270301
@Override
@@ -286,6 +317,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
286317

287318
builder.field(Fields.WRITE_LOAD, getWriteLoad());
288319
builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad());
320+
builder.field(Fields.PEAK_WRITE_LOAD, getPeakWriteLoad());
289321
return builder;
290322
}
291323

@@ -307,7 +339,8 @@ public boolean equals(Object o) {
307339
&& throttleTimeInMillis == that.throttleTimeInMillis
308340
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
309341
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
310-
&& recentIndexingLoad == that.recentIndexingLoad;
342+
&& recentIndexingLoad == that.recentIndexingLoad
343+
&& peakIndexingLoad == that.peakIndexingLoad;
311344
}
312345

313346
@Override
@@ -408,6 +441,7 @@ static final class Fields {
408441
static final String THROTTLED_TIME = "throttle_time";
409442
static final String WRITE_LOAD = "write_load";
410443
static final String RECENT_WRITE_LOAD = "recent_write_load";
444+
static final String PEAK_WRITE_LOAD = "peak_write_load";
411445
}
412446

413447
@Override

0 commit comments

Comments
 (0)