-
Notifications
You must be signed in to change notification settings - Fork 25.6k
ES-10037 Persist recent write load in index metadata #125330
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
6995190
15db2c8
1391c72
6392468
bfd85a7
025b7fd
1f1c954
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,9 +9,11 @@ | |
|
|
||
| package org.elasticsearch.cluster.metadata; | ||
|
|
||
| import org.elasticsearch.TransportVersions; | ||
| import org.elasticsearch.common.io.stream.StreamInput; | ||
| import org.elasticsearch.common.io.stream.StreamOutput; | ||
| import org.elasticsearch.common.io.stream.Writeable; | ||
| import org.elasticsearch.core.Nullable; | ||
| import org.elasticsearch.xcontent.ConstructingObjectParser; | ||
| import org.elasticsearch.xcontent.ParseField; | ||
| import org.elasticsearch.xcontent.ToXContentFragment; | ||
|
|
@@ -27,22 +29,29 @@ | |
| public class IndexWriteLoad implements Writeable, ToXContentFragment { | ||
| public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads"); | ||
| public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes"); | ||
| public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads"); | ||
| private static final Double UNKNOWN_LOAD = -1.0; | ||
| private static final long UNKNOWN_UPTIME = -1; | ||
|
|
||
| @SuppressWarnings("unchecked") | ||
| private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>( | ||
| "index_write_load_parser", | ||
| false, | ||
| (args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1]) | ||
| (args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2]) | ||
| ); | ||
|
|
||
| static { | ||
| PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD); | ||
| PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS); | ||
| // The recent write load field is optional so that we can parse XContent built by older versions which did not include it: | ||
| PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD); | ||
| } | ||
|
|
||
| public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> shardsUptimeInMillis) { | ||
| private static IndexWriteLoad create( | ||
| List<Double> shardsWriteLoad, | ||
| List<Long> shardsUptimeInMillis, | ||
| @Nullable List<Double> shardsRecentWriteLoad | ||
| ) { | ||
| if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) { | ||
| assert false; | ||
| throw new IllegalArgumentException( | ||
|
|
@@ -59,35 +68,63 @@ public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> sha | |
| throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided"); | ||
| } | ||
|
|
||
| if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) { | ||
| assert false; | ||
|
||
| throw new IllegalArgumentException( | ||
| "The same number of shard write loads and shard uptimes should be provided, but " | ||
| + shardsWriteLoad | ||
| + " " | ||
| + shardsUptimeInMillis | ||
| + " were provided" | ||
| ); | ||
| } | ||
|
|
||
| return new IndexWriteLoad( | ||
| shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(), | ||
| shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray() | ||
| shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(), | ||
| shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null | ||
| ); | ||
| } | ||
|
|
||
| private final double[] shardWriteLoad; | ||
| private final long[] shardUptimeInMillis; | ||
| private final double[] shardRecentWriteLoad; | ||
|
|
||
| private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) { | ||
| private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) { | ||
| assert shardWriteLoad.length == shardUptimeInMillis.length; | ||
| this.shardWriteLoad = shardWriteLoad; | ||
| this.shardUptimeInMillis = shardUptimeInMillis; | ||
| if (shardRecentWriteLoad != null) { | ||
| assert shardRecentWriteLoad.length == shardUptimeInMillis.length; | ||
|
||
| this.shardRecentWriteLoad = shardRecentWriteLoad; | ||
| } else { | ||
| this.shardRecentWriteLoad = new double[shardUptimeInMillis.length]; | ||
| Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD); | ||
| } | ||
| } | ||
|
|
||
| public IndexWriteLoad(StreamInput in) throws IOException { | ||
| this(in.readDoubleArray(), in.readLongArray()); | ||
| this( | ||
| in.readDoubleArray(), | ||
| in.readLongArray(), | ||
| in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null | ||
| ); | ||
| } | ||
|
|
||
| @Override | ||
| public void writeTo(StreamOutput out) throws IOException { | ||
| out.writeDoubleArray(shardWriteLoad); | ||
| out.writeLongArray(shardUptimeInMillis); | ||
| if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) { | ||
| out.writeDoubleArray(shardRecentWriteLoad); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { | ||
| builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad); | ||
| builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis); | ||
| builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad); | ||
| return builder; | ||
| } | ||
|
|
||
|
|
@@ -102,14 +139,20 @@ public OptionalDouble getWriteLoadForShard(int shardId) { | |
| return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); | ||
| } | ||
|
|
||
| public OptionalDouble getRecentWriteLoadForShard(int shardId) { | ||
| assertShardInBounds(shardId); | ||
|
|
||
| double load = shardRecentWriteLoad[shardId]; | ||
| return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty(); | ||
| } | ||
|
|
||
| public OptionalLong getUptimeInMillisForShard(int shardId) { | ||
| assertShardInBounds(shardId); | ||
|
|
||
| long uptime = shardUptimeInMillis[shardId]; | ||
| return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty(); | ||
| } | ||
|
|
||
| // Visible for testing | ||
| public int numberOfShards() { | ||
| return shardWriteLoad.length; | ||
| } | ||
|
|
@@ -124,13 +167,16 @@ public boolean equals(Object o) { | |
| if (this == o) return true; | ||
| if (o == null || getClass() != o.getClass()) return false; | ||
| IndexWriteLoad that = (IndexWriteLoad) o; | ||
| return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis); | ||
| return Arrays.equals(shardWriteLoad, that.shardWriteLoad) | ||
| && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis) | ||
| && Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| int result = Arrays.hashCode(shardWriteLoad); | ||
| result = 31 * result + Arrays.hashCode(shardUptimeInMillis); | ||
| result = 31 * result + Arrays.hashCode(shardRecentWriteLoad); | ||
| return result; | ||
| } | ||
|
|
||
|
|
@@ -140,29 +186,33 @@ public static Builder builder(int numShards) { | |
| } | ||
|
|
||
| public static class Builder { | ||
| final double[] shardWriteLoad; | ||
| final long[] uptimeInMillis; | ||
| private final double[] shardWriteLoad; | ||
| private final long[] uptimeInMillis; | ||
| private final double[] shardRecentWriteLoad; | ||
|
|
||
| private Builder(int numShards) { | ||
| this.shardWriteLoad = new double[numShards]; | ||
| this.uptimeInMillis = new long[numShards]; | ||
| this.shardRecentWriteLoad = new double[numShards]; | ||
| Arrays.fill(shardWriteLoad, UNKNOWN_LOAD); | ||
| Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME); | ||
| Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD); | ||
| } | ||
|
|
||
| public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) { | ||
| public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) { | ||
| if (shardId >= this.shardWriteLoad.length) { | ||
| throw new IllegalArgumentException(); | ||
| } | ||
|
|
||
| this.shardWriteLoad[shardId] = load; | ||
| this.uptimeInMillis[shardId] = uptimeInMillis; | ||
| this.shardRecentWriteLoad[shardId] = recentLoad; | ||
|
|
||
| return this; | ||
| } | ||
|
|
||
| public IndexWriteLoad build() { | ||
| return new IndexWriteLoad(shardWriteLoad, uptimeInMillis); | ||
| return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These both say "max" in the name, but I assume one of them should be
shardsByMinThreads?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, good catch. Me and my cut-and-paste again...