Skip to content

Commit b96b1c3

Browse files
Track the peak indexing load for each shard
This tracks the highest value seen for the recent write load metric any time the stats for a shard was computed, exposes this value alongside the recent value, and persists it in index metadata alongside it too. The new test in `IndexShardTests` is designed to more thoroughly test the recent write load metric previously added, as well as to test the peak metric being added here. ES-10037
1 parent 04d0a0a commit b96b1c3

File tree

15 files changed

+270
-62
lines changed

15 files changed

+270
-62
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ static TransportVersion def(int id) {
199199
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
200200
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
201201
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
202+
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_037_0_00);
202203

203204
/*
204205
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
109109
shardStats.getShardRouting().id(),
110110
indexingShardStats.getWriteLoad(),
111111
indexingShardStats.getRecentWriteLoad(),
112+
indexingShardStats.getPeakWriteLoad(),
112113
indexingShardStats.getTotalActiveTimeInMillis()
113114
);
114115
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12-
import org.elasticsearch.TransportVersions;
1312
import org.elasticsearch.common.io.stream.StreamInput;
1413
import org.elasticsearch.common.io.stream.StreamOutput;
1514
import org.elasticsearch.common.io.stream.Writeable;
@@ -26,31 +25,42 @@
2625
import java.util.OptionalDouble;
2726
import java.util.OptionalLong;
2827

28+
import static org.elasticsearch.TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
30+
2931
public class IndexWriteLoad implements Writeable, ToXContentFragment {
3032
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
3133
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
3234
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
35+
public static final ParseField SHARDS_PEAK_WRITE_LOAD_FIELD = new ParseField("peak_loads");
3336
private static final Double UNKNOWN_LOAD = -1.0;
3437
private static final long UNKNOWN_UPTIME = -1;
3538

3639
@SuppressWarnings("unchecked")
3740
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
3841
"index_write_load_parser",
3942
false,
40-
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
43+
(args, unused) -> IndexWriteLoad.create(
44+
(List<Double>) args[0],
45+
(List<Long>) args[1],
46+
(List<Double>) args[2],
47+
(List<Double>) args[3]
48+
)
4149
);
4250

4351
static {
4452
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
4553
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
46-
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
54+
// The recent and peak write load fields are optional so that we can parse XContent built by older versions which did not have them:
4755
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
56+
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_PEAK_WRITE_LOAD_FIELD);
4857
}
4958

5059
private static IndexWriteLoad create(
5160
List<Double> shardsWriteLoad,
5261
List<Long> shardsUptimeInMillis,
53-
@Nullable List<Double> shardsRecentWriteLoad
62+
@Nullable List<Double> shardsRecentWriteLoad,
63+
@Nullable List<Double> shardsPeakWriteLoad
5464
) {
5565
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
5666
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
@@ -83,15 +93,22 @@ private static IndexWriteLoad create(
8393
return new IndexWriteLoad(
8494
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
8595
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
86-
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
96+
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null,
97+
shardsPeakWriteLoad != null ? shardsPeakWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
8798
);
8899
}
89100

90101
private final double[] shardWriteLoad;
91102
private final long[] shardUptimeInMillis;
92103
private final double[] shardRecentWriteLoad;
104+
private final double[] shardPeakWriteLoad;
93105

94-
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
106+
private IndexWriteLoad(
107+
double[] shardWriteLoad,
108+
long[] shardUptimeInMillis,
109+
@Nullable double[] shardRecentWriteLoad,
110+
@Nullable double[] shardPeakWriteLoad
111+
) {
95112
assert shardWriteLoad.length == shardUptimeInMillis.length
96113
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
97114
this.shardWriteLoad = shardWriteLoad;
@@ -104,30 +121,43 @@ private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nul
104121
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
105122
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
106123
}
124+
if (shardPeakWriteLoad != null) {
125+
assert shardPeakWriteLoad.length == shardUptimeInMillis.length
126+
: "IndexWriteLoad constructor was called with non-matched lengths for shardRecentWriteLoad and shardUptimeInMillis";
127+
this.shardPeakWriteLoad = shardPeakWriteLoad;
128+
} else {
129+
this.shardPeakWriteLoad = new double[shardUptimeInMillis.length];
130+
Arrays.fill(this.shardPeakWriteLoad, UNKNOWN_LOAD);
131+
}
107132
}
108133

109134
public IndexWriteLoad(StreamInput in) throws IOException {
110135
this(
111136
in.readDoubleArray(),
112137
in.readLongArray(),
113-
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
138+
in.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null,
139+
in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD) ? in.readDoubleArray() : null
114140
);
115141
}
116142

117143
@Override
118144
public void writeTo(StreamOutput out) throws IOException {
119145
out.writeDoubleArray(shardWriteLoad);
120146
out.writeLongArray(shardUptimeInMillis);
121-
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
147+
if (out.getTransportVersion().onOrAfter(INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
122148
out.writeDoubleArray(shardRecentWriteLoad);
123149
}
150+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
151+
out.writeDoubleArray(shardPeakWriteLoad);
152+
}
124153
}
125154

126155
@Override
127156
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
128157
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
129158
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
130159
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
160+
builder.field(SHARDS_PEAK_WRITE_LOAD_FIELD.getPreferredName(), shardPeakWriteLoad);
131161
return builder;
132162
}
133163

@@ -149,6 +179,13 @@ public OptionalDouble getRecentWriteLoadForShard(int shardId) {
149179
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
150180
}
151181

182+
public OptionalDouble getPeakWriteLoadForShard(int shardId) {
183+
assertShardInBounds(shardId);
184+
185+
double load = shardPeakWriteLoad[shardId];
186+
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
187+
}
188+
152189
public OptionalLong getUptimeInMillisForShard(int shardId) {
153190
assertShardInBounds(shardId);
154191

@@ -172,14 +209,16 @@ public boolean equals(Object o) {
172209
IndexWriteLoad that = (IndexWriteLoad) o;
173210
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
174211
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
175-
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
212+
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad)
213+
&& Arrays.equals(shardPeakWriteLoad, that.shardPeakWriteLoad);
176214
}
177215

178216
@Override
179217
public int hashCode() {
180218
int result = Arrays.hashCode(shardWriteLoad);
181219
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
182220
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
221+
result = 31 * result + Arrays.hashCode(shardPeakWriteLoad);
183222
return result;
184223
}
185224

@@ -192,30 +231,34 @@ public static class Builder {
192231
private final double[] shardWriteLoad;
193232
private final long[] uptimeInMillis;
194233
private final double[] shardRecentWriteLoad;
234+
private final double[] shardPeakWriteLoad;
195235

196236
private Builder(int numShards) {
197237
this.shardWriteLoad = new double[numShards];
198238
this.uptimeInMillis = new long[numShards];
199239
this.shardRecentWriteLoad = new double[numShards];
240+
this.shardPeakWriteLoad = new double[numShards];
200241
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
201242
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
202243
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
244+
Arrays.fill(shardPeakWriteLoad, UNKNOWN_LOAD);
203245
}
204246

205-
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
247+
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, double peakLoad, long uptimeInMillis) {
206248
if (shardId >= this.shardWriteLoad.length) {
207249
throw new IllegalArgumentException();
208250
}
209251

210252
this.shardWriteLoad[shardId] = load;
211253
this.uptimeInMillis[shardId] = uptimeInMillis;
212254
this.shardRecentWriteLoad[shardId] = recentLoad;
255+
this.shardPeakWriteLoad[shardId] = peakLoad;
213256

214257
return this;
215258
}
216259

217260
public IndexWriteLoad build() {
218-
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
261+
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad, shardPeakWriteLoad);
219262
}
220263
}
221264
}

server/src/main/java/org/elasticsearch/index/shard/IndexingStats.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727

2828
import static org.elasticsearch.TransportVersions.INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD;
29+
import static org.elasticsearch.TransportVersions.INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD;
2930

3031
public class IndexingStats implements Writeable, ToXContentFragment {
3132

@@ -46,8 +47,10 @@ public static class Stats implements Writeable, ToXContentFragment {
4647
private long totalIndexingTimeSinceShardStartedInNanos;
4748
private long totalActiveTimeInNanos;
4849
private double recentIndexingLoad;
50+
private double peakIndexingLoad;
4951

50-
Stats() {}
52+
Stats() {
53+
}
5154

5255
public Stats(StreamInput in) throws IOException {
5356
indexCount = in.readVLong();
@@ -76,6 +79,15 @@ public Stats(StreamInput in) throws IOException {
7679
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
7780
: 0;
7881
}
82+
if (in.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
83+
peakIndexingLoad = in.readDouble();
84+
} else {
85+
// When getting stats from an older version which doesn't have the recent indexing load, better to fall back to the
86+
// unweighted write load, rather that assuming zero load:
87+
peakIndexingLoad = totalActiveTimeInNanos > 0
88+
? (double) totalIndexingTimeSinceShardStartedInNanos / totalActiveTimeInNanos
89+
: 0;
90+
}
7991
}
8092

8193
public Stats(
@@ -92,7 +104,8 @@ public Stats(
92104
long throttleTimeInMillis,
93105
long totalIndexingTimeSinceShardStartedInNanos,
94106
long totalActiveTimeInNanos,
95-
double recentIndexingLoad
107+
double recentIndexingLoad,
108+
double peakIndexingLoad
96109
) {
97110
this.indexCount = indexCount;
98111
this.indexTimeInMillis = indexTimeInMillis;
@@ -110,6 +123,7 @@ public Stats(
110123
this.totalActiveTimeInNanos = totalActiveTimeInNanos;
111124
// We store the weighted write load as a double because the calculation is inherently floating point
112125
this.recentIndexingLoad = recentIndexingLoad;
126+
this.peakIndexingLoad = peakIndexingLoad;
113127
}
114128

115129
public void add(Stats stats) {
@@ -131,11 +145,13 @@ public void add(Stats stats) {
131145
// N.B. getWriteLoad() returns the ratio of these sums, which is the average of the ratios weighted by active time:
132146
totalIndexingTimeSinceShardStartedInNanos += stats.totalIndexingTimeSinceShardStartedInNanos;
133147
totalActiveTimeInNanos += stats.totalActiveTimeInNanos;
134-
// We want getRecentWriteLoad() for the aggregated stats to also be the average weighted by active time, so we use the updating
135-
// formula for a weighted mean:
148+
// We want getRecentWriteLoad() and getPeakWriteLoad() for the aggregated stats to also be the average weighted by active time,
149+
// so we use the updating formula for a weighted mean:
136150
if (totalActiveTimeInNanos > 0) {
137151
recentIndexingLoad += (stats.recentIndexingLoad - recentIndexingLoad) * stats.totalActiveTimeInNanos
138152
/ totalActiveTimeInNanos;
153+
peakIndexingLoad += (stats.peakIndexingLoad - peakIndexingLoad) * stats.totalActiveTimeInNanos
154+
/ totalActiveTimeInNanos;
139155
}
140156
}
141157

@@ -239,6 +255,20 @@ public double getRecentWriteLoad() {
239255
return recentIndexingLoad;
240256
}
241257

258+
/**
259+
* Returns a measurement of the peak write load.
260+
*
261+
* <p>If this {@link Stats} instance represents a single shard, this is the highest value that {@link #getRecentWriteLoad()} would
262+
* return for any of the instances created for this shard since it started (i.e. the highest value seen by any call to
263+
* {@link InternalIndexingStats#stats}.
264+
*
265+
* <p>If this {@link Stats} instance represents multiple shards, this is the average of that value for each shard, weighted by
266+
* the elapsed time for each shard. (N.B. This is the average of the peak values, <i>not</i> the peak of the average value.)
267+
*/
268+
public double getPeakWriteLoad() {
269+
return peakIndexingLoad;
270+
}
271+
242272
public long getTotalActiveTimeInMillis() {
243273
return TimeUnit.NANOSECONDS.toMillis(totalActiveTimeInNanos);
244274
}
@@ -265,6 +295,9 @@ public void writeTo(StreamOutput out) throws IOException {
265295
if (out.getTransportVersion().onOrAfter(INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD)) {
266296
out.writeDouble(recentIndexingLoad);
267297
}
298+
if (out.getTransportVersion().onOrAfter(INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD)) {
299+
out.writeDouble(peakIndexingLoad);
300+
}
268301
}
269302

270303
@Override
@@ -286,6 +319,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
286319

287320
builder.field(Fields.WRITE_LOAD, getWriteLoad());
288321
builder.field(Fields.RECENT_WRITE_LOAD, getRecentWriteLoad());
322+
builder.field(Fields.PEAK_WRITE_LOAD, getPeakWriteLoad());
289323
return builder;
290324
}
291325

@@ -307,7 +341,8 @@ public boolean equals(Object o) {
307341
&& throttleTimeInMillis == that.throttleTimeInMillis
308342
&& totalIndexingTimeSinceShardStartedInNanos == that.totalIndexingTimeSinceShardStartedInNanos
309343
&& totalActiveTimeInNanos == that.totalActiveTimeInNanos
310-
&& recentIndexingLoad == that.recentIndexingLoad;
344+
&& recentIndexingLoad == that.recentIndexingLoad
345+
&& peakIndexingLoad == that.peakIndexingLoad;
311346
}
312347

313348
@Override
@@ -408,6 +443,7 @@ static final class Fields {
408443
static final String THROTTLED_TIME = "throttle_time";
409444
static final String WRITE_LOAD = "write_load";
410445
static final String RECENT_WRITE_LOAD = "recent_write_load";
446+
static final String PEAK_WRITE_LOAD = "peak_write_load";
411447
}
412448

413449
@Override

server/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.index.engine.VersionConflictEngineException;
2222

2323
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicReference;
2425
import java.util.function.LongSupplier;
2526

2627
import static org.elasticsearch.core.TimeValue.timeValueNanos;
@@ -155,6 +156,7 @@ void noopUpdate() {
155156
static class StatsHolder {
156157
private final MeanMetric indexMetric = new MeanMetric(); // Used for the count and total 'took' time (in ns) of index operations
157158
private final ExponentiallyWeightedMovingRate recentIndexMetric; // An EWMR of the total 'took' time of index operations (in ns)
159+
private final AtomicReference<Double> peakIndexMetric; // The peak value of the EWMR observed in any stats() call
158160
private final MeanMetric deleteMetric = new MeanMetric();
159161
private final CounterMetric indexCurrent = new CounterMetric();
160162
private final CounterMetric indexFailed = new CounterMetric();
@@ -170,6 +172,7 @@ static class StatsHolder {
170172
lambdaInInverseNanos
171173
);
172174
this.recentIndexMetric = new ExponentiallyWeightedMovingRate(lambdaInInverseNanos, startTimeInNanos);
175+
this.peakIndexMetric = new AtomicReference<>(0.0);
173176
}
174177

175178
IndexingStats.Stats stats(
@@ -189,15 +192,17 @@ IndexingStats.Stats stats(
189192
currentTimeInNanos - timeSinceShardStartedInNanos,
190193
recentIndexingLoadAtShardStarted
191194
);
195+
double peakIndexingLoad = peakIndexMetric.accumulateAndGet(recentIndexingLoadSinceShardStarted, Math::max);
192196
logger.debug(
193197
() -> Strings.format(
194198
"Generating stats for an index shard with indexing time %s and active time %s giving unweighted write load %g, "
195-
+ "while the recency-weighted write load is %g using a half-life of %s",
199+
+ "while the recency-weighted write load is %g using a half-life of %s and the peak write load is %g",
196200
timeValueNanos(totalIndexingTimeSinceShardStartedInNanos),
197201
timeValueNanos(timeSinceShardStartedInNanos),
198202
1.0 * totalIndexingTimeSinceShardStartedInNanos / timeSinceShardStartedInNanos,
199203
recentIndexingLoadSinceShardStarted,
200-
timeValueNanos((long) recentIndexMetric.getHalfLife())
204+
timeValueNanos((long) recentIndexMetric.getHalfLife()),
205+
peakIndexingLoad
201206
)
202207
);
203208
return new IndexingStats.Stats(
@@ -214,7 +219,8 @@ IndexingStats.Stats stats(
214219
TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis),
215220
totalIndexingTimeSinceShardStartedInNanos,
216221
timeSinceShardStartedInNanos,
217-
recentIndexingLoadSinceShardStarted
222+
recentIndexingLoadSinceShardStarted,
223+
peakIndexingLoad
218224
);
219225
}
220226
}

server/src/test/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStatsTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -592,6 +592,7 @@ private static CommonStats createShardLevelCommonStats() {
592592
++iota,
593593
++iota,
594594
++iota,
595+
++iota,
595596
++iota
596597
);
597598
indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats));

0 commit comments

Comments
 (0)