Skip to content

Commit 694cfba

Browse files
author
elasticsearchmachine
committed
Merge remote-tracking branch 'origin/main' into lucene_snapshot
2 parents 15e5384 + 888e9a2 commit 694cfba

File tree

22 files changed

+305
-73
lines changed

22 files changed

+305
-73
lines changed

modules/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3BlobContainer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,13 @@ void run(BytesReference expected, BytesReference updated, ActionListener<Optiona
830830

831831
.<Void>newForked(l -> ensureOtherUploadsComplete(uploadId, uploadIndex, currentUploads, l))
832832

833-
// Step 4: Read the current register value.
833+
// Step 4: Read the current register value. Note that getRegister only has read-after-write semantics but that's ok here as:
834+
// - all earlier uploads are now complete,
835+
// - our upload is not completing yet, and
836+
// - later uploads can only be completing if they have already aborted ours.
837+
// Thus if our operation ultimately succeeds then there cannot have been any concurrent writes in flight, so this read
838+
// cannot have observed a stale value, whereas if our operation ultimately fails then it doesn't matter what this read
839+
// observes.
834840

835841
.<OptionalBytesReference>andThen(l -> getRegister(purpose, rawKey, l))
836842

muted-tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,12 @@ tests:
481481
- class: org.elasticsearch.xpack.esql.action.CrossClusterQueryWithPartialResultsIT
482482
method: testPartialResults
483483
issue: https://github.com/elastic/elasticsearch/issues/131481
484+
- class: org.elasticsearch.packaging.test.DockerTests
485+
method: test010Install
486+
issue: https://github.com/elastic/elasticsearch/issues/131376
487+
- class: org.elasticsearch.test.rest.yaml.RcsCcsCommonYamlTestSuiteIT
488+
method: test {p0=search/40_indices_boost/Indices boost with alias}
489+
issue: https://github.com/elastic/elasticsearch/issues/131598
484490

485491
# Examples:
486492
#

server/src/internalClusterTest/java/org/elasticsearch/cluster/SpecificMasterNodesIT.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -60,26 +60,30 @@ public void testElectOnlyBetweenMasterNodes() throws Exception {
6060

6161
logger.info("--> start master node (1)");
6262
final String masterNodeName = internalCluster().startMasterOnlyNode();
63-
awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNodeName);
64-
awaitMasterNode(internalCluster().getMasterName(), masterNodeName);
63+
for (var nodeName : internalCluster().getNodeNames()) {
64+
awaitMasterNode(nodeName, masterNodeName);
65+
}
6566

6667
logger.info("--> start master node (2)");
6768
final String nextMasterEligableNodeName = internalCluster().startMasterOnlyNode();
68-
awaitMasterNode(internalCluster().getNonMasterNodeName(), masterNodeName);
69-
awaitMasterNode(internalCluster().getMasterName(), masterNodeName);
69+
for (var nodeName : internalCluster().getNodeNames()) {
70+
awaitMasterNode(nodeName, masterNodeName);
71+
}
7072

7173
logger.info("--> closing master node (1)");
7274
client().execute(
7375
TransportAddVotingConfigExclusionsAction.TYPE,
7476
new AddVotingConfigExclusionsRequest(TEST_REQUEST_TIMEOUT, masterNodeName)
7577
).get();
7678
// removing the master from the voting configuration immediately triggers the master to step down
77-
awaitMasterNode(internalCluster().getNonMasterNodeName(), nextMasterEligableNodeName);
78-
awaitMasterNode(internalCluster().getMasterName(), nextMasterEligableNodeName);
79+
for (var nodeName : internalCluster().getNodeNames()) {
80+
awaitMasterNode(nodeName, nextMasterEligableNodeName);
81+
}
7982

8083
internalCluster().stopNode(masterNodeName);
81-
awaitMasterNode(internalCluster().getNonMasterNodeName(), nextMasterEligableNodeName);
82-
awaitMasterNode(internalCluster().getMasterName(), nextMasterEligableNodeName);
84+
for (var nodeName : internalCluster().getNodeNames()) {
85+
awaitMasterNode(nodeName, nextMasterEligableNodeName);
86+
}
8387
}
8488

8589
public void testAliasFilterValidation() {

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.cluster.NodeUsageStatsForThreadPools;
2828
import org.elasticsearch.cluster.NodeUsageStatsForThreadPoolsCollector;
2929
import org.elasticsearch.cluster.metadata.IndexMetadata;
30+
import org.elasticsearch.cluster.metadata.ProjectId;
3031
import org.elasticsearch.cluster.node.DiscoveryNode;
3132
import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
3233
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -104,6 +105,7 @@
104105
import java.util.concurrent.atomic.AtomicReference;
105106
import java.util.function.Predicate;
106107
import java.util.stream.Collectors;
108+
import java.util.stream.IntStream;
107109
import java.util.stream.Stream;
108110

109111
import static com.carrotsearch.randomizedtesting.RandomizedTest.randomAsciiLettersOfLength;
@@ -355,6 +357,62 @@ public void testNodeWriteLoadsArePresent() {
355357
}
356358
}
357359

360+
public void testShardWriteLoadsArePresent() {
361+
// Create some indices and some write-load
362+
final int numIndices = randomIntBetween(1, 5);
363+
final String indexPrefix = randomIdentifier();
364+
IntStream.range(0, numIndices).forEach(i -> {
365+
final String indexName = indexPrefix + "_" + i;
366+
createIndex(indexName, Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, randomIntBetween(1, 3)).build());
367+
IntStream.range(0, randomIntBetween(1, 500))
368+
.forEach(j -> prepareIndex(indexName).setSource("foo", randomIdentifier(), "bar", randomIdentifier()).get());
369+
});
370+
371+
final InternalClusterInfoService clusterInfoService = (InternalClusterInfoService) getInstanceFromNode(ClusterInfoService.class);
372+
373+
// Not collecting stats yet because allocation write load stats collection is disabled by default.
374+
{
375+
ClusterInfoServiceUtils.refresh(clusterInfoService);
376+
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();
377+
assertNotNull(shardWriteLoads);
378+
assertTrue(shardWriteLoads.isEmpty());
379+
}
380+
381+
// Turn on collection of write-load stats.
382+
updateClusterSettings(
383+
Settings.builder()
384+
.put(
385+
WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey(),
386+
WriteLoadConstraintSettings.WriteLoadDeciderStatus.ENABLED
387+
)
388+
.build()
389+
);
390+
391+
try {
392+
// Force a ClusterInfo refresh to run collection of the write-load stats.
393+
ClusterInfoServiceUtils.refresh(clusterInfoService);
394+
final Map<ShardId, Double> shardWriteLoads = clusterInfoService.getClusterInfo().getShardWriteLoads();
395+
396+
// Verify that each shard has write-load reported.
397+
final ClusterState state = getInstanceFromNode(ClusterService.class).state();
398+
assertEquals(state.projectState(ProjectId.DEFAULT).metadata().getTotalNumberOfShards(), shardWriteLoads.size());
399+
double maximumLoadRecorded = 0;
400+
for (IndexMetadata indexMetadata : state.projectState(ProjectId.DEFAULT).metadata()) {
401+
for (int i = 0; i < indexMetadata.getNumberOfShards(); i++) {
402+
final ShardId shardId = new ShardId(indexMetadata.getIndex(), i);
403+
assertTrue(shardWriteLoads.containsKey(shardId));
404+
maximumLoadRecorded = Math.max(shardWriteLoads.get(shardId), maximumLoadRecorded);
405+
}
406+
}
407+
// And that at least one is greater than zero
408+
assertThat(maximumLoadRecorded, greaterThan(0.0));
409+
} finally {
410+
updateClusterSettings(
411+
Settings.builder().putNull(WriteLoadConstraintSettings.WRITE_LOAD_DECIDER_ENABLED_SETTING.getKey()).build()
412+
);
413+
}
414+
}
415+
358416
public void testIndexCanChangeCustomDataPath() throws Exception {
359417
final String index = "test-custom-data-path";
360418
final Path sharedDataPath = getInstanceFromNode(Environment.class).sharedDataDir().resolve(randomAsciiLettersOfLength(10));

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ static TransportVersion def(int id) {
344344
public static final TransportVersion ML_INFERENCE_AZURE_AI_STUDIO_RERANK_ADDED = def(9_123_0_00);
345345
public static final TransportVersion PROJECT_STATE_REGISTRY_ENTRY = def(9_124_0_00);
346346
public static final TransportVersion ML_INFERENCE_LLAMA_ADDED = def(9_125_0_00);
347+
public static final TransportVersion SHARD_WRITE_LOAD_IN_CLUSTER_INFO = def(9_126_0_00);
347348

348349
/*
349350
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141

4242
/**
4343
* ClusterInfo is an object representing a map of nodes to {@link DiskUsage}
44-
* and a map of shard ids to shard sizes, see
44+
* and a map of shard ids to shard sizes and shard write-loads, see
4545
* <code>InternalClusterInfoService.shardIdentifierFromRouting(String)</code>
4646
* for the key used in the shardSizes map
4747
*/
@@ -59,9 +59,10 @@ public class ClusterInfo implements ChunkedToXContent, Writeable {
5959
final Map<NodeAndPath, ReservedSpace> reservedSpace;
6060
final Map<String, EstimatedHeapUsage> estimatedHeapUsages;
6161
final Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools;
62+
final Map<ShardId, Double> shardWriteLoads;
6263

6364
protected ClusterInfo() {
64-
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
65+
this(Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of(), Map.of());
6566
}
6667

6768
/**
@@ -85,7 +86,8 @@ public ClusterInfo(
8586
Map<NodeAndShard, String> dataPath,
8687
Map<NodeAndPath, ReservedSpace> reservedSpace,
8788
Map<String, EstimatedHeapUsage> estimatedHeapUsages,
88-
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools
89+
Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools,
90+
Map<ShardId, Double> shardWriteLoads
8991
) {
9092
this.leastAvailableSpaceUsage = Map.copyOf(leastAvailableSpaceUsage);
9193
this.mostAvailableSpaceUsage = Map.copyOf(mostAvailableSpaceUsage);
@@ -95,6 +97,7 @@ public ClusterInfo(
9597
this.reservedSpace = Map.copyOf(reservedSpace);
9698
this.estimatedHeapUsages = Map.copyOf(estimatedHeapUsages);
9799
this.nodeUsageStatsForThreadPools = Map.copyOf(nodeUsageStatsForThreadPools);
100+
this.shardWriteLoads = Map.copyOf(shardWriteLoads);
98101
}
99102

100103
public ClusterInfo(StreamInput in) throws IOException {
@@ -116,6 +119,11 @@ public ClusterInfo(StreamInput in) throws IOException {
116119
} else {
117120
this.nodeUsageStatsForThreadPools = Map.of();
118121
}
122+
if (in.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
123+
this.shardWriteLoads = in.readImmutableMap(ShardId::new, StreamInput::readDouble);
124+
} else {
125+
this.shardWriteLoads = Map.of();
126+
}
119127
}
120128

121129
@Override
@@ -136,6 +144,9 @@ public void writeTo(StreamOutput out) throws IOException {
136144
if (out.getTransportVersion().onOrAfter(TransportVersions.NODE_USAGE_STATS_FOR_THREAD_POOLS_IN_CLUSTER_INFO)) {
137145
out.writeMap(this.nodeUsageStatsForThreadPools, StreamOutput::writeWriteable);
138146
}
147+
if (out.getTransportVersion().onOrAfter(TransportVersions.SHARD_WRITE_LOAD_IN_CLUSTER_INFO)) {
148+
out.writeMap(this.shardWriteLoads, StreamOutput::writeWriteable, StreamOutput::writeDouble);
149+
}
139150
}
140151

141152
/**
@@ -216,7 +227,7 @@ public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params
216227
return builder.endObject(); // NodeAndPath
217228
}),
218229
endArray() // end "reserved_sizes"
219-
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools at this stage, to avoid
230+
// NOTE: We don't serialize estimatedHeapUsages/nodeUsageStatsForThreadPools/shardWriteLoads at this stage, to avoid
220231
// committing to API payloads until the features are settled
221232
);
222233
}
@@ -255,6 +266,16 @@ public Map<String, DiskUsage> getNodeMostAvailableDiskUsages() {
255266
return this.mostAvailableSpaceUsage;
256267
}
257268

269+
/**
270+
* Returns a map of shard IDs to the write-loads for use in balancing. The write-loads can be interpreted
271+
* as the average number of threads that ingestion to the shard will consume.
272+
* This information may be partial or missing altogether under some circumstances. The absence of a shard
273+
* write load from the map should be interpreted as "unknown".
274+
*/
275+
public Map<ShardId, Double> getShardWriteLoads() {
276+
return shardWriteLoads;
277+
}
278+
258279
/**
259280
* Returns the shard size for the given shardId or <code>null</code> if that metric is not available.
260281
*/
@@ -331,7 +352,9 @@ public boolean equals(Object o) {
331352
&& shardDataSetSizes.equals(that.shardDataSetSizes)
332353
&& dataPath.equals(that.dataPath)
333354
&& reservedSpace.equals(that.reservedSpace)
334-
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools);
355+
&& estimatedHeapUsages.equals(that.estimatedHeapUsages)
356+
&& nodeUsageStatsForThreadPools.equals(that.nodeUsageStatsForThreadPools)
357+
&& shardWriteLoads.equals(that.shardWriteLoads);
335358
}
336359

337360
@Override
@@ -343,7 +366,9 @@ public int hashCode() {
343366
shardDataSetSizes,
344367
dataPath,
345368
reservedSpace,
346-
nodeUsageStatsForThreadPools
369+
estimatedHeapUsages,
370+
nodeUsageStatsForThreadPools,
371+
shardWriteLoads
347372
);
348373
}
349374

@@ -466,6 +491,7 @@ public static class Builder {
466491
private Map<NodeAndPath, ReservedSpace> reservedSpace = Map.of();
467492
private Map<String, EstimatedHeapUsage> estimatedHeapUsages = Map.of();
468493
private Map<String, NodeUsageStatsForThreadPools> nodeUsageStatsForThreadPools = Map.of();
494+
private Map<ShardId, Double> shardWriteLoads = Map.of();
469495

470496
public ClusterInfo build() {
471497
return new ClusterInfo(
@@ -476,7 +502,8 @@ public ClusterInfo build() {
476502
dataPath,
477503
reservedSpace,
478504
estimatedHeapUsages,
479-
nodeUsageStatsForThreadPools
505+
nodeUsageStatsForThreadPools,
506+
shardWriteLoads
480507
);
481508
}
482509

@@ -519,5 +546,10 @@ public Builder nodeUsageStatsForThreadPools(Map<String, NodeUsageStatsForThreadP
519546
this.nodeUsageStatsForThreadPools = nodeUsageStatsForThreadPools;
520547
return this;
521548
}
549+
550+
public Builder shardWriteLoads(Map<ShardId, Double> shardWriteLoads) {
551+
this.shardWriteLoads = shardWriteLoads;
552+
return this;
553+
}
522554
}
523555
}

server/src/main/java/org/elasticsearch/cluster/ClusterInfoSimulator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,8 @@ public ClusterInfo getClusterInfo() {
159159
dataPath,
160160
Map.of(),
161161
estimatedHeapUsages,
162-
nodeThreadPoolUsageStats
162+
nodeThreadPoolUsageStats,
163+
allocation.clusterInfo().getShardWriteLoads()
163164
);
164165
}
165166
}

0 commit comments

Comments
 (0)