Skip to content

Commit 1bf5dde

Browse files
committed
Merge branch 'main' into fix-rollover-not-found
2 parents 5c13399 + 7a0a399 commit 1bf5dde

File tree

28 files changed

+345
-128
lines changed

28 files changed

+345
-128
lines changed

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/action/TransportGetDataStreamLifecycleStatsActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class TransportGetDataStreamLifecycleStatsActionTests extends ESTestCase
5252
mock(ThreadPool.class),
5353
mock(ActionFilters.class),
5454
dataStreamLifecycleService,
55-
TestProjectResolvers.singleProjectOnly()
55+
TestProjectResolvers.alwaysThrow()
5656
);
5757
private Long lastRunDuration;
5858
private Long timeBetweenStarts;

muted-tests.yml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,9 +393,6 @@ tests:
393393
- class: org.elasticsearch.reservedstate.service.ReservedClusterStateServiceTests
394394
method: testProcessMultipleChunks
395395
issue: https://github.com/elastic/elasticsearch/issues/125305
396-
- class: org.elasticsearch.index.mapper.NativeArrayIntegrationTestCase
397-
method: testSynthesizeArrayRandomIgnoresMalformed
398-
issue: https://github.com/elastic/elasticsearch/issues/125319
399396

400397
# Examples:
401398
#

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ static TransportVersion def(int id) {
195195
public static final TransportVersion INTRODUCE_LIFECYCLE_TEMPLATE = def(9_033_0_00);
196196
public static final TransportVersion INDEXING_STATS_INCLUDES_RECENT_WRITE_LOAD = def(9_034_0_00);
197197
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
198+
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
198199

199200
/*
200201
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/datastreams/autosharding/DataStreamAutoShardingService.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,17 @@ private AutoShardingResult getDecreaseShardsResult(
322322

323323
// Visible for testing
324324
static long computeOptimalNumberOfShards(int minNumberWriteThreads, int maxNumberWriteThreads, double indexingLoad) {
325+
/*
326+
* Define:
327+
* - shardsByMaxThreads = number of shards required to ensure no more than 50% utilization with max number of threads per shard
328+
* - shardsByMinThreads = number of shards required to ensure no more than 50% utilization with min number of threads per shard
329+
* Note that shardsByMaxThreads <= shardsByMinThreads.
330+
* This returns:
331+
* - shardsByMaxThreads if shardsByMaxThreads > 3
332+
* - 3 if shardsByMaxThreads <= 3 and shardsByMinThreads > 3
333+
* - shardsByMinThreads if 0 < shardsByMinThreads <= 3
334+
* - 1 if shardsByMinThreads == 0
335+
*/
325336
return Math.max(
326337
Math.max(
327338
Math.min(roundUp(indexingLoad / (minNumberWriteThreads / 2.0)), 3),

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetadataStats.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public static IndexMetadataStats fromStatsResponse(IndexMetadata indexMetadata,
108108
indexWriteLoadBuilder.withShardWriteLoad(
109109
shardStats.getShardRouting().id(),
110110
indexingShardStats.getWriteLoad(),
111+
indexingShardStats.getRecentWriteLoad(),
111112
indexingShardStats.getTotalActiveTimeInMillis()
112113
);
113114
totalSizeInBytes += commonStats.getDocs().getTotalSizeInBytes();

server/src/main/java/org/elasticsearch/cluster/metadata/IndexWriteLoad.java

Lines changed: 67 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99

1010
package org.elasticsearch.cluster.metadata;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.common.io.stream.Writeable;
16+
import org.elasticsearch.core.Nullable;
1517
import org.elasticsearch.xcontent.ConstructingObjectParser;
1618
import org.elasticsearch.xcontent.ParseField;
1719
import org.elasticsearch.xcontent.ToXContentFragment;
@@ -27,24 +29,31 @@
2729
public class IndexWriteLoad implements Writeable, ToXContentFragment {
2830
public static final ParseField SHARDS_WRITE_LOAD_FIELD = new ParseField("loads");
2931
public static final ParseField SHARDS_UPTIME_IN_MILLIS = new ParseField("uptimes");
32+
public static final ParseField SHARDS_RECENT_WRITE_LOAD_FIELD = new ParseField("recent_loads");
3033
private static final Double UNKNOWN_LOAD = -1.0;
3134
private static final long UNKNOWN_UPTIME = -1;
3235

3336
@SuppressWarnings("unchecked")
3437
private static final ConstructingObjectParser<IndexWriteLoad, Void> PARSER = new ConstructingObjectParser<>(
3538
"index_write_load_parser",
3639
false,
37-
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1])
40+
(args, unused) -> IndexWriteLoad.create((List<Double>) args[0], (List<Long>) args[1], (List<Double>) args[2])
3841
);
3942

4043
static {
4144
PARSER.declareDoubleArray(ConstructingObjectParser.constructorArg(), SHARDS_WRITE_LOAD_FIELD);
4245
PARSER.declareLongArray(ConstructingObjectParser.constructorArg(), SHARDS_UPTIME_IN_MILLIS);
46+
// The recent write load field is optional so that we can parse XContent built by older versions which did not include it:
47+
PARSER.declareDoubleArray(ConstructingObjectParser.optionalConstructorArg(), SHARDS_RECENT_WRITE_LOAD_FIELD);
4348
}
4449

45-
public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> shardsUptimeInMillis) {
50+
private static IndexWriteLoad create(
51+
List<Double> shardsWriteLoad,
52+
List<Long> shardsUptimeInMillis,
53+
@Nullable List<Double> shardsRecentWriteLoad
54+
) {
4655
if (shardsWriteLoad.size() != shardsUptimeInMillis.size()) {
47-
assert false;
56+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
4857
throw new IllegalArgumentException(
4958
"The same number of shard write loads and shard uptimes should be provided, but "
5059
+ shardsWriteLoad
@@ -55,39 +64,70 @@ public static IndexWriteLoad create(List<Double> shardsWriteLoad, List<Long> sha
5564
}
5665

5766
if (shardsWriteLoad.isEmpty()) {
58-
assert false;
67+
assert false : "IndexWriteLoad.create() was called with empty shardsRecentWriteLoad";
68+
;
5969
throw new IllegalArgumentException("At least one shard write load and uptime should be provided, but none was provided");
6070
}
6171

72+
if (shardsRecentWriteLoad != null && shardsRecentWriteLoad.size() != shardsUptimeInMillis.size()) {
73+
assert false : "IndexWriteLoad.create() was called with non-matched lengths for shardsRecentWriteLoad and shardUptimeInMillis";
74+
throw new IllegalArgumentException(
75+
"The same number of shard write loads and shard uptimes should be provided, but "
76+
+ shardsWriteLoad
77+
+ " "
78+
+ shardsUptimeInMillis
79+
+ " were provided"
80+
);
81+
}
82+
6283
return new IndexWriteLoad(
6384
shardsWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray(),
64-
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray()
85+
shardsUptimeInMillis.stream().mapToLong(shardUptime -> shardUptime).toArray(),
86+
shardsRecentWriteLoad != null ? shardsRecentWriteLoad.stream().mapToDouble(shardLoad -> shardLoad).toArray() : null
6587
);
6688
}
6789

6890
private final double[] shardWriteLoad;
6991
private final long[] shardUptimeInMillis;
92+
private final double[] shardRecentWriteLoad;
7093

71-
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis) {
72-
assert shardWriteLoad.length == shardUptimeInMillis.length;
94+
private IndexWriteLoad(double[] shardWriteLoad, long[] shardUptimeInMillis, @Nullable double[] shardRecentWriteLoad) {
95+
assert shardWriteLoad.length == shardUptimeInMillis.length
96+
: "IndexWriteLoad constructor was called with non-matched lengths for shardWriteLoad and shardUptimeInMillis";
7397
this.shardWriteLoad = shardWriteLoad;
7498
this.shardUptimeInMillis = shardUptimeInMillis;
99+
if (shardRecentWriteLoad != null) {
100+
assert shardRecentWriteLoad.length == shardUptimeInMillis.length
101+
: "IndexWriteLoad constructor was called with non-matched lengths for shardRecentWriteLoad and shardUptimeInMillis";
102+
this.shardRecentWriteLoad = shardRecentWriteLoad;
103+
} else {
104+
this.shardRecentWriteLoad = new double[shardUptimeInMillis.length];
105+
Arrays.fill(this.shardRecentWriteLoad, UNKNOWN_LOAD);
106+
}
75107
}
76108

77109
public IndexWriteLoad(StreamInput in) throws IOException {
78-
this(in.readDoubleArray(), in.readLongArray());
110+
this(
111+
in.readDoubleArray(),
112+
in.readLongArray(),
113+
in.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD) ? in.readDoubleArray() : null
114+
);
79115
}
80116

81117
@Override
82118
public void writeTo(StreamOutput out) throws IOException {
83119
out.writeDoubleArray(shardWriteLoad);
84120
out.writeLongArray(shardUptimeInMillis);
121+
if (out.getTransportVersion().onOrAfter(TransportVersions.INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD)) {
122+
out.writeDoubleArray(shardRecentWriteLoad);
123+
}
85124
}
86125

87126
@Override
88127
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
89128
builder.field(SHARDS_WRITE_LOAD_FIELD.getPreferredName(), shardWriteLoad);
90129
builder.field(SHARDS_UPTIME_IN_MILLIS.getPreferredName(), shardUptimeInMillis);
130+
builder.field(SHARDS_RECENT_WRITE_LOAD_FIELD.getPreferredName(), shardRecentWriteLoad);
91131
return builder;
92132
}
93133

@@ -102,14 +142,20 @@ public OptionalDouble getWriteLoadForShard(int shardId) {
102142
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
103143
}
104144

145+
public OptionalDouble getRecentWriteLoadForShard(int shardId) {
146+
assertShardInBounds(shardId);
147+
148+
double load = shardRecentWriteLoad[shardId];
149+
return load != UNKNOWN_LOAD ? OptionalDouble.of(load) : OptionalDouble.empty();
150+
}
151+
105152
public OptionalLong getUptimeInMillisForShard(int shardId) {
106153
assertShardInBounds(shardId);
107154

108155
long uptime = shardUptimeInMillis[shardId];
109156
return uptime != UNKNOWN_UPTIME ? OptionalLong.of(uptime) : OptionalLong.empty();
110157
}
111158

112-
// Visible for testing
113159
public int numberOfShards() {
114160
return shardWriteLoad.length;
115161
}
@@ -124,13 +170,16 @@ public boolean equals(Object o) {
124170
if (this == o) return true;
125171
if (o == null || getClass() != o.getClass()) return false;
126172
IndexWriteLoad that = (IndexWriteLoad) o;
127-
return Arrays.equals(shardWriteLoad, that.shardWriteLoad) && Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis);
173+
return Arrays.equals(shardWriteLoad, that.shardWriteLoad)
174+
&& Arrays.equals(shardUptimeInMillis, that.shardUptimeInMillis)
175+
&& Arrays.equals(shardRecentWriteLoad, that.shardRecentWriteLoad);
128176
}
129177

130178
@Override
131179
public int hashCode() {
132180
int result = Arrays.hashCode(shardWriteLoad);
133181
result = 31 * result + Arrays.hashCode(shardUptimeInMillis);
182+
result = 31 * result + Arrays.hashCode(shardRecentWriteLoad);
134183
return result;
135184
}
136185

@@ -140,29 +189,33 @@ public static Builder builder(int numShards) {
140189
}
141190

142191
public static class Builder {
143-
final double[] shardWriteLoad;
144-
final long[] uptimeInMillis;
192+
private final double[] shardWriteLoad;
193+
private final long[] uptimeInMillis;
194+
private final double[] shardRecentWriteLoad;
145195

146196
private Builder(int numShards) {
147197
this.shardWriteLoad = new double[numShards];
148198
this.uptimeInMillis = new long[numShards];
199+
this.shardRecentWriteLoad = new double[numShards];
149200
Arrays.fill(shardWriteLoad, UNKNOWN_LOAD);
150201
Arrays.fill(uptimeInMillis, UNKNOWN_UPTIME);
202+
Arrays.fill(shardRecentWriteLoad, UNKNOWN_LOAD);
151203
}
152204

153-
public Builder withShardWriteLoad(int shardId, double load, long uptimeInMillis) {
205+
public Builder withShardWriteLoad(int shardId, double load, double recentLoad, long uptimeInMillis) {
154206
if (shardId >= this.shardWriteLoad.length) {
155207
throw new IllegalArgumentException();
156208
}
157209

158210
this.shardWriteLoad[shardId] = load;
159211
this.uptimeInMillis[shardId] = uptimeInMillis;
212+
this.shardRecentWriteLoad[shardId] = recentLoad;
160213

161214
return this;
162215
}
163216

164217
public IndexWriteLoad build() {
165-
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis);
218+
return new IndexWriteLoad(shardWriteLoad, uptimeInMillis, shardRecentWriteLoad);
166219
}
167220
}
168221
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
298298
private final RefreshFieldHasValueListener refreshFieldHasValueListener;
299299
private volatile boolean useRetentionLeasesInPeerRecovery;
300300
private final LongSupplier relativeTimeInNanosSupplier;
301-
private volatile long startedRelativeTimeInNanos;
301+
private volatile long startedRelativeTimeInNanos = -1L; // use -1 to indicate this has not yet been set to its true value
302302
private volatile long indexingTimeBeforeShardStartedInNanos;
303303
private volatile double recentIndexingLoadAtShardStarted;
304304
private final SubscribableListener<Void> waitForEngineOrClosedShardListeners = new SubscribableListener<>();
@@ -557,7 +557,10 @@ public void updateShardState(
557557
: "a primary relocation is completed by the master, but primary mode is not active " + currentRouting;
558558

559559
changeState(IndexShardState.STARTED, "global state is [" + newRouting.state() + "]");
560-
startedRelativeTimeInNanos = getRelativeTimeInNanos();
560+
long relativeTimeInNanos = getRelativeTimeInNanos();
561+
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value. So in the vanishingly
562+
// unlikely case that getRelativeTimeInNanos() returns exactly -1, we advance by 1ns to avoid that special value.
563+
startedRelativeTimeInNanos = (relativeTimeInNanos != -1L) ? relativeTimeInNanos : 0L;
561564
indexingTimeBeforeShardStartedInNanos = internalIndexingStats.totalIndexingTimeInNanos();
562565
recentIndexingLoadAtShardStarted = internalIndexingStats.recentIndexingLoad(startedRelativeTimeInNanos);
563566
} else if (currentRouting.primary()
@@ -1370,11 +1373,14 @@ public IndexingStats indexingStats() {
13701373
}
13711374

13721375
long currentTimeInNanos = getRelativeTimeInNanos();
1376+
// We use -1 to indicate that startedRelativeTimeInNanos has yet not been set to its true value, i.e the shard has not started.
1377+
// In that case, we set timeSinceShardStartedInNanos to zero (which will result in all load metrics definitely being zero).
1378+
long timeSinceShardStartedInNanos = (startedRelativeTimeInNanos != -1L) ? (currentTimeInNanos - startedRelativeTimeInNanos) : 0L;
13731379
return internalIndexingStats.stats(
13741380
throttled,
13751381
throttleTimeInMillis,
13761382
indexingTimeBeforeShardStartedInNanos,
1377-
currentTimeInNanos - startedRelativeTimeInNanos,
1383+
timeSinceShardStartedInNanos,
13781384
currentTimeInNanos,
13791385
recentIndexingLoadAtShardStarted
13801386
);

server/src/main/java/org/elasticsearch/index/shard/IndexingStatsSettings.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@
1414
import org.elasticsearch.common.settings.Settings;
1515
import org.elasticsearch.core.TimeValue;
1616

17-
import java.util.concurrent.TimeUnit;
1817
import java.util.concurrent.atomic.AtomicReference;
1918

2019
/**
2120
* Container for cluster settings related to {@link IndexingStats}.
2221
*/
2322
public class IndexingStatsSettings {
2423

25-
// TODO: Change this default to something sensible:
26-
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = new TimeValue(10000, TimeUnit.DAYS);
24+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT = TimeValue.timeValueMinutes(5); // Aligns with the interval between DSL runs
25+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MIN = TimeValue.timeValueSeconds(1); // A sub-second half-life makes no sense
26+
static final TimeValue RECENT_WRITE_LOAD_HALF_LIFE_MAX = TimeValue.timeValueDays(100_000); // Long.MAX_VALUE nanos, rounded down
2727

2828
/**
2929
* A cluster setting giving the half-life, in seconds, to use for the Exponentially Weighted Moving Rate calculation used for the
@@ -34,7 +34,8 @@ public class IndexingStatsSettings {
3434
public static final Setting<TimeValue> RECENT_WRITE_LOAD_HALF_LIFE_SETTING = Setting.timeSetting(
3535
"indices.stats.recent_write_load.half_life",
3636
RECENT_WRITE_LOAD_HALF_LIFE_DEFAULT,
37-
TimeValue.ZERO,
37+
RECENT_WRITE_LOAD_HALF_LIFE_MIN,
38+
RECENT_WRITE_LOAD_HALF_LIFE_MAX,
3839
Setting.Property.Dynamic,
3940
Setting.Property.NodeScope
4041
);

server/src/test/java/org/elasticsearch/action/ActionModuleTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ public void testSetupRestHandlerContainsKnownBuiltin() {
134134
List.of(),
135135
RestExtension.allowAll(),
136136
new IncrementalBulkService(null, null),
137-
TestProjectResolvers.singleProjectOnly()
137+
TestProjectResolvers.alwaysThrow()
138138
);
139139
actionModule.initRestHandlers(null, null);
140140
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
@@ -201,7 +201,7 @@ public String getName() {
201201
List.of(),
202202
RestExtension.allowAll(),
203203
new IncrementalBulkService(null, null),
204-
TestProjectResolvers.singleProjectOnly()
204+
TestProjectResolvers.alwaysThrow()
205205
);
206206
Exception e = expectThrows(IllegalArgumentException.class, () -> actionModule.initRestHandlers(null, null));
207207
assertThat(e.getMessage(), startsWith("Cannot replace existing handler for [/_nodes] for method: GET"));
@@ -261,7 +261,7 @@ public List<RestHandler> getRestHandlers(
261261
List.of(),
262262
RestExtension.allowAll(),
263263
new IncrementalBulkService(null, null),
264-
TestProjectResolvers.singleProjectOnly()
264+
TestProjectResolvers.alwaysThrow()
265265
);
266266
actionModule.initRestHandlers(null, null);
267267
// At this point the easiest way to confirm that a handler is loaded is to try to register another one on top of it and to fail
@@ -314,7 +314,7 @@ public void test3rdPartyHandlerIsNotInstalled() {
314314
List.of(),
315315
RestExtension.allowAll(),
316316
new IncrementalBulkService(null, null),
317-
TestProjectResolvers.singleProjectOnly()
317+
TestProjectResolvers.alwaysThrow()
318318
)
319319
);
320320
assertThat(
@@ -358,7 +358,7 @@ public void test3rdPartyRestControllerIsNotInstalled() {
358358
List.of(),
359359
RestExtension.allowAll(),
360360
new IncrementalBulkService(null, null),
361-
TestProjectResolvers.singleProjectOnly()
361+
TestProjectResolvers.alwaysThrow()
362362
)
363363
);
364364
assertThat(

0 commit comments

Comments
 (0)